X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/0c9ca0e11ca290392e2747596b89d18db175af7e..6208b3a77644afe5c7c28688cd6d7554a0281953:/redis.c?ds=sidebyside diff --git a/redis.c b/redis.c index f432825f..7c382af7 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,9 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "0.08" +#define REDIS_VERSION "0.091" + +#include "fmacros.h" #include #include @@ -46,6 +48,7 @@ #include #include #include +#include #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ @@ -53,6 +56,7 @@ #include "dict.h" /* Hash tables */ #include "adlist.h" /* Linked lists */ #include "zmalloc.h" /* total memory usage aware version of malloc/free */ +#include "lzf.h" /* Error codes */ #define REDIS_OK 0 @@ -61,13 +65,14 @@ /* Static server configuration */ #define REDIS_SERVERPORT 6379 /* TCP port */ #define REDIS_MAXIDLETIME (60*5) /* default client timeout */ -#define REDIS_QUERYBUF_LEN 1024 +#define REDIS_IOBUF_LEN 1024 #define REDIS_LOADBUF_LEN 1024 #define REDIS_MAX_ARGS 16 #define REDIS_DEFAULT_DBNUM 16 #define REDIS_CONFIGLINE_MAX 1024 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */ #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */ +#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */ /* Hash table parameters */ #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */ @@ -82,20 +87,59 @@ #define REDIS_LIST 1 #define REDIS_SET 2 #define REDIS_HASH 3 + +/* Object types only used for dumping to disk */ +#define REDIS_EXPIRETIME 253 #define REDIS_SELECTDB 254 #define REDIS_EOF 255 +/* Defines related to the dump file format. To store 32 bits lengths for short + * keys requires a lot of space, so we check the most significant 2 bits of + * the first byte to interpreter the length: + * + * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte + * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte + * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow + * 11|000000 this means: specially encoded object will follow. The six bits + * number specify the kind of object that follows. + * See the REDIS_RDB_ENC_* defines. + * + * Lenghts up to 63 are stored using a single byte, most DB keys, and may + * values, will fit inside. */ +#define REDIS_RDB_6BITLEN 0 +#define REDIS_RDB_14BITLEN 1 +#define REDIS_RDB_32BITLEN 2 +#define REDIS_RDB_ENCVAL 3 +#define REDIS_RDB_LENERR UINT_MAX + +/* When a length of a string object stored on disk has the first two bits + * set, the remaining two bits specify a special encoding for the object + * accordingly to the following defines: */ +#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */ +#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */ +#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */ +#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */ + /* Client flags */ #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */ #define REDIS_SLAVE 2 /* This client is a slave server */ #define REDIS_MASTER 4 /* This client is a master server */ #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */ -/* Server replication state */ +/* Slave replication state - slave side */ #define REDIS_REPL_NONE 0 /* No active replication */ #define REDIS_REPL_CONNECT 1 /* Must connect to master */ #define REDIS_REPL_CONNECTED 2 /* Connected to master */ +/* Slave replication state - from the point of view of master + * Note that in SEND_BULK and ONLINE state the slave receives new updates + * in its output queue. In the WAIT_BGSAVE state instead the server is waiting + * to start the next background saving in order to send updates to it. */ +#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */ +#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */ +#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */ +#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */ + /* List related stuff */ #define REDIS_HEAD 0 #define REDIS_TAIL 1 @@ -121,26 +165,37 @@ /* A redis object, that is a type able to hold a string / list / set */ typedef struct redisObject { - int type; void *ptr; + int type; int refcount; } robj; +typedef struct redisDb { + dict *dict; + dict *expires; + int id; +} redisDb; + /* With multiplexing we need to take per-clinet state. * Clients are taken in a liked list. */ typedef struct redisClient { int fd; - dict *dict; + redisDb *db; int dictid; sds querybuf; robj *argv[REDIS_MAX_ARGS]; int argc; - int bulklen; /* bulk read len. -1 if not in bulk read mode */ + int bulklen; /* bulk read len. -1 if not in bulk read mode */ list *reply; int sentlen; time_t lastinteraction; /* time of the last interaction, used for timeout */ - int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */ - int slaveseldb; /* slave selected db, if this client is a slave */ + int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */ + int slaveseldb; /* slave selected db, if this client is a slave */ + int authenticated; /* when requirepass is non-NULL */ + int replstate; /* replication state if this is a slave */ + int repldbfd; /* replication DB file descriptor */ + long repldboff; /* replication DB file offset */ + off_t repldbsize; /* replication DB file size */ } redisClient; struct saveparam { @@ -152,7 +207,9 @@ struct saveparam { struct redisServer { int port; int fd; - dict **dict; + redisDb *db; + dict *sharingpool; + unsigned int sharingpoolsize; long long dirty; /* changes to DB from the last save */ list *clients; list *slaves, *monitors; @@ -161,7 +218,7 @@ struct redisServer { int cronloops; /* number of times the cron function run */ list *objfreelist; /* A list of freed objects to avoid malloc() */ time_t lastsave; /* Unix time of last save succeeede */ - int usedmemory; /* Used memory in megabytes */ + size_t usedmemory; /* Used memory in megabytes */ /* Fields used only for stats */ time_t stat_starttime; /* server start time */ long long stat_numcommands; /* number of processed commands */ @@ -179,11 +236,13 @@ struct redisServer { char *logfile; char *bindaddr; char *dbfilename; + char *requirepass; + int shareobjects; /* Replication related */ int isslave; char *masterhost; int masterport; - redisClient *master; + redisClient *master; /* client that is master for this slave */ int replstate; /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ @@ -215,7 +274,7 @@ typedef struct _redisSortOperation { struct sharedObjectsStruct { robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space, - *colon, *minus1, *nullbulk, *nullmultibulk, + *colon, *nullbulk, *nullmultibulk, *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, *outofrangeerr, *plus, *select0, *select1, *select2, *select3, *select4, @@ -230,15 +289,24 @@ static void freeSetObject(robj *o); static void decrRefCount(void *o); static robj *createObject(int type, void *ptr); static void freeClient(redisClient *c); -static int loadDb(char *filename); +static int rdbLoad(char *filename); static void addReply(redisClient *c, robj *obj); static void addReplySds(redisClient *c, sds s); static void incrRefCount(robj *o); -static int saveDbBackground(char *filename); +static int rdbSaveBackground(char *filename); static robj *createStringObject(char *ptr, size_t len); static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc); static int syncWithMaster(void); - +static robj *tryObjectSharing(robj *o); +static int removeExpire(redisDb *db, robj *key); +static int expireIfNeeded(redisDb *db, robj *key); +static int deleteIfVolatile(redisDb *db, robj *key); +static int deleteKey(redisDb *db, robj *key); +static time_t getExpire(redisDb *db, robj *key); +static int setExpire(redisDb *db, robj *key, time_t when); +static void updateSalvesWaitingBgsave(int bgsaveerr); + +static void authCommand(redisClient *c); static void pingCommand(redisClient *c); static void echoCommand(redisClient *c); static void setCommand(redisClient *c); @@ -277,6 +345,8 @@ static void sismemberCommand(redisClient *c); static void scardCommand(redisClient *c); static void sinterCommand(redisClient *c); static void sinterstoreCommand(redisClient *c); +static void sunionCommand(redisClient *c); +static void sunionstoreCommand(redisClient *c); static void syncCommand(redisClient *c); static void flushdbCommand(redisClient *c); static void flushallCommand(redisClient *c); @@ -285,6 +355,7 @@ static void lremCommand(redisClient *c); static void infoCommand(redisClient *c); static void mgetCommand(redisClient *c); static void monitorCommand(redisClient *c); +static void expireCommand(redisClient *c); /*================================= Globals ================================= */ @@ -315,6 +386,8 @@ static struct redisCommand cmdTable[] = { {"scard",scardCommand,2,REDIS_CMD_INLINE}, {"sinter",sinterCommand,-2,REDIS_CMD_INLINE}, {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE}, + {"sunion",sunionCommand,-2,REDIS_CMD_INLINE}, + {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE}, {"smembers",sinterCommand,2,REDIS_CMD_INLINE}, {"incrby",incrbyCommand,3,REDIS_CMD_INLINE}, {"decrby",decrbyCommand,3,REDIS_CMD_INLINE}, @@ -325,6 +398,7 @@ static struct redisCommand cmdTable[] = { {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE}, {"keys",keysCommand,2,REDIS_CMD_INLINE}, {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE}, + {"auth",authCommand,2,REDIS_CMD_INLINE}, {"ping",pingCommand,1,REDIS_CMD_INLINE}, {"echo",echoCommand,2,REDIS_CMD_BULK}, {"save",saveCommand,1,REDIS_CMD_INLINE}, @@ -338,6 +412,7 @@ static struct redisCommand cmdTable[] = { {"sort",sortCommand,-2,REDIS_CMD_INLINE}, {"info",infoCommand,1,REDIS_CMD_INLINE}, {"monitor",monitorCommand,1,REDIS_CMD_INLINE}, + {"expire",expireCommand,3,REDIS_CMD_INLINE}, {NULL,NULL,0,0} }; @@ -559,13 +634,11 @@ static void oom(const char *msg) { /* ====================== Redis server networking stuff ===================== */ void closeTimedoutClients(void) { redisClient *c; - listIter *li; listNode *ln; time_t now = time(NULL); - li = listGetIterator(server.clients,AL_START_HEAD); - if (!li) return; - while ((ln = listNextElement(li)) != NULL) { + listRewind(server.clients); + while ((ln = listYield(server.clients)) != NULL) { c = listNodeValue(ln); if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ (now - c->lastinteraction > server.maxidletime)) { @@ -573,11 +646,10 @@ void closeTimedoutClients(void) { freeClient(c); } } - listReleaseIterator(li); } int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { - int j, size, used, loops = server.cronloops++; + int j, loops = server.cronloops++; REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(id); REDIS_NOTUSED(clientData); @@ -588,26 +660,30 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL * we resize the hash table to save memory */ for (j = 0; j < server.dbnum; j++) { - size = dictGetHashTableSize(server.dict[j]); - used = dictGetHashTableUsed(server.dict[j]); + int size, used, vkeys; + + size = dictSlots(server.db[j].dict); + used = dictSize(server.db[j].dict); + vkeys = dictSize(server.db[j].expires); if (!(loops % 5) && used > 0) { - redisLog(REDIS_DEBUG,"DB %d: %d keys in %d slots HT.",j,used,size); - // dictPrintStats(server.dict); + redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size); + /* dictPrintStats(server.dict); */ } if (size && used && size > REDIS_HT_MINSLOTS && (used*100/size < REDIS_HT_MINFILL)) { redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j); - dictResize(server.dict[j]); + dictResize(server.db[j].dict); redisLog(REDIS_NOTICE,"Hash table %d resized.",j); } } /* Show information about connected clients */ if (!(loops % 5)) { - redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %d bytes in use", + redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), - server.usedmemory); + server.usedmemory, + dictSize(server.sharingpool)); } /* Close connections of timedout clients */ @@ -617,6 +693,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Check if a background saving in progress terminated */ if (server.bgsaveinprogress) { int statloc; + /* XXX: TODO handle the case of the saving child killed */ if (wait4(-1,&statloc,WNOHANG,NULL)) { int exitcode = WEXITSTATUS(statloc); if (exitcode == 0) { @@ -629,6 +706,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { "Background saving error"); } server.bgsaveinprogress = 0; + updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR); } } else { /* If there is not a background saving in progress check if @@ -641,11 +719,35 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { now-server.lastsave > sp->seconds) { redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, sp->seconds); - saveDbBackground(server.dbfilename); + rdbSaveBackground(server.dbfilename); break; } } } + + /* Try to expire a few timed out keys */ + for (j = 0; j < server.dbnum; j++) { + redisDb *db = server.db+j; + int num = dictSize(db->expires); + + if (num) { + time_t now = time(NULL); + + if (num > REDIS_EXPIRELOOKUPS_PER_CRON) + num = REDIS_EXPIRELOOKUPS_PER_CRON; + while (num--) { + dictEntry *de; + time_t t; + + if ((de = dictGetRandomKey(db->expires)) == NULL) break; + t = (time_t) dictGetEntryVal(de); + if (now > t) { + deleteKey(db,dictGetEntryKey(de)); + } + } + } + } + /* Check if we should connect to a MASTER */ if (server.replstate == REDIS_REPL_CONNECT) { redisLog(REDIS_NOTICE,"Connecting to MASTER..."); @@ -667,7 +769,6 @@ static void createSharedObjects(void) { shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n")); shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n")); /* no such key */ - shared.minus1 = createObject(REDIS_STRING,sdsnew("-1\r\n")); shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n")); shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew( "-ERR Operation against a key holding the wrong kind of value\r\n")); @@ -720,6 +821,8 @@ static void initServerConfig() { server.daemonize = 0; server.pidfile = "/var/run/redis.pid"; server.dbfilename = "dump.rdb"; + server.requirepass = NULL; + server.shareobjects = 0; ResetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -745,8 +848,10 @@ static void initServer() { server.objfreelist = listCreate(); createSharedObjects(); server.el = aeCreateEventLoop(); - server.dict = zmalloc(sizeof(dict*)*server.dbnum); - if (!server.dict || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist) + server.db = zmalloc(sizeof(redisDb)*server.dbnum); + server.sharingpool = dictCreate(&setDictType,NULL); + server.sharingpoolsize = 1024; + if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist) oom("server initialization"); /* Fatal OOM */ server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); if (server.fd == -1) { @@ -754,9 +859,9 @@ static void initServer() { exit(1); } for (j = 0; j < server.dbnum; j++) { - server.dict[j] = dictCreate(&hashDictType,NULL); - if (!server.dict[j]) - oom("dictCreate"); /* Fatal OOM */ + server.db[j].dict = dictCreate(&hashDictType,NULL); + server.db[j].expires = dictCreate(&setDictType,NULL); + server.db[j].id = j; } server.cronloops = 0; server.bgsaveinprogress = 0; @@ -773,8 +878,10 @@ static void initServer() { static void emptyDb() { int j; - for (j = 0; j < server.dbnum; j++) - dictEmpty(server.dict[j]); + for (j = 0; j < server.dbnum; j++) { + dictEmpty(server.db[j].dict); + dictEmpty(server.db[j].expires); + } } /* I agree, this is a very rudimental way to load a configuration... @@ -876,6 +983,13 @@ static void loadServerConfig(char *filename) { else { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcmp(argv[0],"shareobjects") && argc == 2) { + sdstolower(argv[1]); + if (!strcmp(argv[1],"yes")) server.shareobjects = 1; + else if (!strcmp(argv[1],"no")) server.shareobjects = 0; + else { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcmp(argv[0],"daemonize") && argc == 2) { sdstolower(argv[1]); if (!strcmp(argv[1],"yes")) server.daemonize = 1; @@ -883,6 +997,8 @@ static void loadServerConfig(char *filename) { else { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcmp(argv[0],"requirepass") && argc == 2) { + server.requirepass = zstrdup(argv[1]); } else if (!strcmp(argv[0],"pidfile") && argc == 2) { server.pidfile = zstrdup(argv[1]); } else { @@ -925,6 +1041,8 @@ static void freeClient(redisClient *c) { assert(ln != NULL); listDelNode(server.clients,ln); if (c->flags & REDIS_SLAVE) { + if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) + close(c->repldbfd); list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves; ln = listSearchKey(l,c); assert(ln != NULL); @@ -939,13 +1057,13 @@ static void freeClient(redisClient *c) { static void glueReplyBuffersIfNeeded(redisClient *c) { int totlen = 0; - listNode *ln = c->reply->head, *next; + listNode *ln; robj *o; - while(ln) { + listRewind(c->reply); + while((ln = listYield(c->reply))) { o = ln->value; totlen += sdslen(o->ptr); - ln = ln->next; /* This optimization makes more sense if we don't have to copy * too much data */ if (totlen > 1024) return; @@ -954,14 +1072,12 @@ static void glueReplyBuffersIfNeeded(redisClient *c) { char buf[1024]; int copylen = 0; - ln = c->reply->head; - while(ln) { - next = ln->next; + listRewind(c->reply); + while((ln = listYield(c->reply))) { o = ln->value; memcpy(buf+copylen,o->ptr,sdslen(o->ptr)); copylen += sdslen(o->ptr); listDelNode(c->reply,ln); - ln = next; } /* Now the output buffer is empty, add the new single element */ addReplySds(c,sdsnewlen(buf,totlen)); @@ -989,7 +1105,7 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) if (c->flags & REDIS_MASTER) { nwritten = objlen - c->sentlen; } else { - nwritten = write(fd, o->ptr+c->sentlen, objlen - c->sentlen); + nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen); if (nwritten <= 0) break; } c->sentlen += nwritten; @@ -1083,13 +1199,26 @@ static int processCommand(redisClient *c) { return 1; } } + /* Let's try to share objects on the command arguments vector */ + if (server.shareobjects) { + int j; + for(j = 1; j < c->argc; j++) + c->argv[j] = tryObjectSharing(c->argv[j]); + } + /* Check if the user is authenticated */ + if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { + addReplySds(c,sdsnew("-ERR operation not permitted\r\n")); + resetClient(c); + return 1; + } + /* Exec the command */ dirty = server.dirty; cmd->proc(c); if (server.dirty-dirty != 0 && listLength(server.slaves)) - replicationFeedSlaves(server.slaves,cmd,c->dictid,c->argv,c->argc); + replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc); if (listLength(server.monitors)) - replicationFeedSlaves(server.monitors,cmd,c->dictid,c->argv,c->argc); + replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc); server.stat_numcommands++; /* Prepare the client for the next command */ @@ -1102,7 +1231,7 @@ static int processCommand(redisClient *c) { } static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) { - listNode *ln = slaves->head; + listNode *ln; robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */ int outc = 0, j; @@ -1120,8 +1249,18 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di } outv[outc++] = shared.crlf; - while(ln) { + /* Increment all the refcounts at start and decrement at end in order to + * be sure to free objects if there is no slave in a replication state + * able to be feed with commands */ + for (j = 0; j < outc; j++) incrRefCount(outv[j]); + listRewind(slaves); + while((ln = listYield(slaves))) { redisClient *slave = ln->value; + + /* Don't feed slaves that are still waiting for BGSAVE to start */ + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; + + /* Feed all the other slaves, MONITORs and so on */ if (slave->slaveseldb != dictid) { robj *selectcmd; @@ -1146,18 +1285,18 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di slave->slaveseldb = dictid; } for (j = 0; j < outc; j++) addReply(slave,outv[j]); - ln = ln->next; } + for (j = 0; j < outc; j++) decrRefCount(outv[j]); } static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = (redisClient*) privdata; - char buf[REDIS_QUERYBUF_LEN]; + char buf[REDIS_IOBUF_LEN]; int nread; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); - nread = read(fd, buf, REDIS_QUERYBUF_LEN); + nread = read(fd, buf, REDIS_IOBUF_LEN); if (nread == -1) { if (errno == EAGAIN) { nread = 0; @@ -1247,11 +1386,15 @@ again: static int selectDb(redisClient *c, int id) { if (id < 0 || id >= server.dbnum) return REDIS_ERR; - c->dict = server.dict[id]; - c->dictid = id; + c->db = &server.db[id]; return REDIS_OK; } +static void *dupClientReplyValue(void *o) { + incrRefCount((robj*)o); + return 0; +} + static redisClient *createClient(int fd) { redisClient *c = zmalloc(sizeof(*c)); @@ -1266,8 +1409,11 @@ static redisClient *createClient(int fd) { c->sentlen = 0; c->flags = 0; c->lastinteraction = time(NULL); + c->authenticated = 0; + c->replstate = REDIS_REPL_NONE; if ((c->reply = listCreate()) == NULL) oom("listCreate"); listSetFreeMethod(c->reply,decrRefCount); + listSetDupMethod(c->reply,dupClientReplyValue); if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c, NULL) == AE_ERR) { freeClient(c); @@ -1279,6 +1425,8 @@ static redisClient *createClient(int fd) { static void addReply(redisClient *c, robj *obj) { if (listLength(c->reply) == 0 && + (c->replstate == REDIS_REPL_NONE || + c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c, NULL) == AE_ERR) return; if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail"); @@ -1349,14 +1497,6 @@ static robj *createSetObject(void) { return createObject(REDIS_SET,d); } -#if 0 -static robj *createHashObject(void) { - dict *d = dictCreate(&hashDictType,NULL); - if (!d) oom("dictCreate"); - return createObject(REDIS_SET,d); -} -#endif - static void freeStringObject(robj *o) { sdsfree(o->ptr); } @@ -1375,10 +1515,19 @@ static void freeHashObject(robj *o) { static void incrRefCount(robj *o) { o->refcount++; +#ifdef DEBUG_REFCOUNT + if (o->type == REDIS_STRING) + printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount); +#endif } static void decrRefCount(void *obj) { robj *o = obj; + +#ifdef DEBUG_REFCOUNT + if (o->type == REDIS_STRING) + printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1); +#endif if (--(o->refcount) == 0) { switch(o->type) { case REDIS_STRING: freeStringObject(o); break; @@ -1393,17 +1542,223 @@ static void decrRefCount(void *obj) { } } +/* Try to share an object against the shared objects pool */ +static robj *tryObjectSharing(robj *o) { + struct dictEntry *de; + unsigned long c; + + if (o == NULL || server.shareobjects == 0) return o; + + assert(o->type == REDIS_STRING); + de = dictFind(server.sharingpool,o); + if (de) { + robj *shared = dictGetEntryKey(de); + + c = ((unsigned long) dictGetEntryVal(de))+1; + dictGetEntryVal(de) = (void*) c; + incrRefCount(shared); + decrRefCount(o); + return shared; + } else { + /* Here we are using a stream algorihtm: Every time an object is + * shared we increment its count, everytime there is a miss we + * recrement the counter of a random object. If this object reaches + * zero we remove the object and put the current object instead. */ + if (dictSize(server.sharingpool) >= + server.sharingpoolsize) { + de = dictGetRandomKey(server.sharingpool); + assert(de != NULL); + c = ((unsigned long) dictGetEntryVal(de))-1; + dictGetEntryVal(de) = (void*) c; + if (c == 0) { + dictDelete(server.sharingpool,de->key); + } + } else { + c = 0; /* If the pool is empty we want to add this object */ + } + if (c == 0) { + int retval; + + retval = dictAdd(server.sharingpool,o,(void*)1); + assert(retval == DICT_OK); + incrRefCount(o); + } + return o; + } +} + +static robj *lookupKey(redisDb *db, robj *key) { + dictEntry *de = dictFind(db->dict,key); + return de ? dictGetEntryVal(de) : NULL; +} + +static robj *lookupKeyRead(redisDb *db, robj *key) { + expireIfNeeded(db,key); + return lookupKey(db,key); +} + +static robj *lookupKeyWrite(redisDb *db, robj *key) { + deleteIfVolatile(db,key); + return lookupKey(db,key); +} + +static int deleteKey(redisDb *db, robj *key) { + int retval; + + /* We need to protect key from destruction: after the first dictDelete() + * it may happen that 'key' is no longer valid if we don't increment + * it's count. This may happen when we get the object reference directly + * from the hash table with dictRandomKey() or dict iterators */ + incrRefCount(key); + if (dictSize(db->expires)) dictDelete(db->expires,key); + retval = dictDelete(db->dict,key); + decrRefCount(key); + + return retval == DICT_OK; +} + /*============================ DB saving/loading ============================ */ +static int rdbSaveType(FILE *fp, unsigned char type) { + if (fwrite(&type,1,1,fp) == 0) return -1; + return 0; +} + +static int rdbSaveTime(FILE *fp, time_t t) { + int32_t t32 = (int32_t) t; + if (fwrite(&t32,4,1,fp) == 0) return -1; + return 0; +} + +/* check rdbLoadLen() comments for more info */ +static int rdbSaveLen(FILE *fp, uint32_t len) { + unsigned char buf[2]; + + if (len < (1<<6)) { + /* Save a 6 bit len */ + buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6); + if (fwrite(buf,1,1,fp) == 0) return -1; + } else if (len < (1<<14)) { + /* Save a 14 bit len */ + buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6); + buf[1] = len&0xFF; + if (fwrite(buf,2,1,fp) == 0) return -1; + } else { + /* Save a 32 bit len */ + buf[0] = (REDIS_RDB_32BITLEN<<6); + if (fwrite(buf,1,1,fp) == 0) return -1; + len = htonl(len); + if (fwrite(&len,4,1,fp) == 0) return -1; + } + return 0; +} + +/* String objects in the form "2391" "-100" without any space and with a + * range of values that can fit in an 8, 16 or 32 bit signed value can be + * encoded as integers to save space */ +int rdbTryIntegerEncoding(sds s, unsigned char *enc) { + long long value; + char *endptr, buf[32]; + + /* Check if it's possible to encode this value as a number */ + value = strtoll(s, &endptr, 10); + if (endptr[0] != '\0') return 0; + snprintf(buf,32,"%lld",value); + + /* If the number converted back into a string is not identical + * then it's not possible to encode the string as integer */ + if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0; + + /* Finally check if it fits in our ranges */ + if (value >= -(1<<7) && value <= (1<<7)-1) { + enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8; + enc[1] = value&0xFF; + return 2; + } else if (value >= -(1<<15) && value <= (1<<15)-1) { + enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16; + enc[1] = value&0xFF; + enc[2] = (value>>8)&0xFF; + return 3; + } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) { + enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32; + enc[1] = value&0xFF; + enc[2] = (value>>8)&0xFF; + enc[3] = (value>>16)&0xFF; + enc[4] = (value>>24)&0xFF; + return 5; + } else { + return 0; + } +} + +static int rdbSaveLzfStringObject(FILE *fp, robj *obj) { + unsigned int comprlen, outlen; + unsigned char byte; + void *out; + + /* We require at least four bytes compression for this to be worth it */ + outlen = sdslen(obj->ptr)-4; + if (outlen <= 0) return 0; + if ((out = zmalloc(outlen)) == NULL) return 0; + comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen); + if (comprlen == 0) { + zfree(out); + return 0; + } + /* Data compressed! Let's save it on disk */ + byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF; + if (fwrite(&byte,1,1,fp) == 0) goto writeerr; + if (rdbSaveLen(fp,comprlen) == -1) goto writeerr; + if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr; + if (fwrite(out,comprlen,1,fp) == 0) goto writeerr; + zfree(out); + return comprlen; + +writeerr: + zfree(out); + return -1; +} + +/* Save a string objet as [len][data] on disk. If the object is a string + * representation of an integer value we try to safe it in a special form */ +static int rdbSaveStringObject(FILE *fp, robj *obj) { + size_t len = sdslen(obj->ptr); + int enclen; + + /* Try integer encoding */ + if (len <= 11) { + unsigned char buf[5]; + if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) { + if (fwrite(buf,enclen,1,fp) == 0) return -1; + return 0; + } + } + + /* Try LZF compression - under 20 bytes it's unable to compress even + * aaaaaaaaaaaaaaaaaa so skip it */ + if (len > 20) { + int retval; + + retval = rdbSaveLzfStringObject(fp,obj); + if (retval == -1) return -1; + if (retval > 0) return 0; + /* retval == 0 means data can't be compressed, save the old way */ + } + + /* Store verbatim */ + if (rdbSaveLen(fp,len) == -1) return -1; + if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1; + return 0; +} + /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */ -static int saveDb(char *filename) { +static int rdbSave(char *filename) { dictIterator *di = NULL; dictEntry *de; - uint32_t len; - uint8_t type; FILE *fp; char tmpfile[256]; int j; + time_t now = time(NULL); snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random()); fp = fopen(tmpfile,"w"); @@ -1411,10 +1766,11 @@ static int saveDb(char *filename) { redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno)); return REDIS_ERR; } - if (fwrite("REDIS0000",9,1,fp) == 0) goto werr; + if (fwrite("REDIS0001",9,1,fp) == 0) goto werr; for (j = 0; j < server.dbnum; j++) { - dict *d = server.dict[j]; - if (dictGetHashTableUsed(d) == 0) continue; + redisDb *db = server.db+j; + dict *d = db->dict; + if (dictSize(d) == 0) continue; di = dictGetIterator(d); if (!di) { fclose(fp); @@ -1422,60 +1778,52 @@ static int saveDb(char *filename) { } /* Write the SELECT DB opcode */ - type = REDIS_SELECTDB; - len = htonl(j); - if (fwrite(&type,1,1,fp) == 0) goto werr; - if (fwrite(&len,4,1,fp) == 0) goto werr; + if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr; + if (rdbSaveLen(fp,j) == -1) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { robj *key = dictGetEntryKey(de); robj *o = dictGetEntryVal(de); - - type = o->type; - len = htonl(sdslen(key->ptr)); - if (fwrite(&type,1,1,fp) == 0) goto werr; - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (fwrite(key->ptr,sdslen(key->ptr),1,fp) == 0) goto werr; - if (type == REDIS_STRING) { + time_t expiretime = getExpire(db,key); + + /* Save the expire time */ + if (expiretime != -1) { + /* If this key is already expired skip it */ + if (expiretime < now) continue; + if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr; + if (rdbSaveTime(fp,expiretime) == -1) goto werr; + } + /* Save the key and associated value */ + if (rdbSaveType(fp,o->type) == -1) goto werr; + if (rdbSaveStringObject(fp,key) == -1) goto werr; + if (o->type == REDIS_STRING) { /* Save a string value */ - sds sval = o->ptr; - len = htonl(sdslen(sval)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (sdslen(sval) && - fwrite(sval,sdslen(sval),1,fp) == 0) goto werr; - } else if (type == REDIS_LIST) { + if (rdbSaveStringObject(fp,o) == -1) goto werr; + } else if (o->type == REDIS_LIST) { /* Save a list value */ list *list = o->ptr; - listNode *ln = list->head; + listNode *ln; - len = htonl(listLength(list)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - while(ln) { + listRewind(list); + if (rdbSaveLen(fp,listLength(list)) == -1) goto werr; + while((ln = listYield(list))) { robj *eleobj = listNodeValue(ln); - len = htonl(sdslen(eleobj->ptr)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (sdslen(eleobj->ptr) && fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0) - goto werr; - ln = ln->next; + + if (rdbSaveStringObject(fp,eleobj) == -1) goto werr; } - } else if (type == REDIS_SET) { + } else if (o->type == REDIS_SET) { /* Save a set value */ dict *set = o->ptr; dictIterator *di = dictGetIterator(set); dictEntry *de; if (!set) oom("dictGetIteraotr"); - len = htonl(dictGetHashTableUsed(set)); - if (fwrite(&len,4,1,fp) == 0) goto werr; + if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr; while((de = dictNext(di)) != NULL) { - robj *eleobj; + robj *eleobj = dictGetEntryKey(de); - eleobj = dictGetEntryKey(de); - len = htonl(sdslen(eleobj->ptr)); - if (fwrite(&len,4,1,fp) == 0) goto werr; - if (sdslen(eleobj->ptr) && fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0) - goto werr; + if (rdbSaveStringObject(fp,eleobj) == -1) goto werr; } dictReleaseIterator(di); } else { @@ -1485,8 +1833,9 @@ static int saveDb(char *filename) { dictReleaseIterator(di); } /* EOF opcode */ - type = REDIS_EOF; - if (fwrite(&type,1,1,fp) == 0) goto werr; + if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr; + + /* Make sure data will not remain on the OS's output buffers */ fflush(fp); fsync(fileno(fp)); fclose(fp); @@ -1511,14 +1860,14 @@ werr: return REDIS_ERR; } -static int saveDbBackground(char *filename) { +static int rdbSaveBackground(char *filename) { pid_t childpid; if (server.bgsaveinprogress) return REDIS_ERR; if ((childpid = fork()) == 0) { /* Child */ close(server.fd); - if (saveDb(filename) == REDIS_OK) { + if (rdbSave(filename) == REDIS_OK) { exit(0); } else { exit(1); @@ -1532,84 +1881,191 @@ static int saveDbBackground(char *filename) { return REDIS_OK; /* unreached */ } -static int loadDb(char *filename) { +static int rdbLoadType(FILE *fp) { + unsigned char type; + if (fread(&type,1,1,fp) == 0) return -1; + return type; +} + +static time_t rdbLoadTime(FILE *fp) { + int32_t t32; + if (fread(&t32,4,1,fp) == 0) return -1; + return (time_t) t32; +} + +/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top + * of this file for a description of how this are stored on disk. + * + * isencoded is set to 1 if the readed length is not actually a length but + * an "encoding type", check the above comments for more info */ +static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) { + unsigned char buf[2]; + uint32_t len; + + if (isencoded) *isencoded = 0; + if (rdbver == 0) { + if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR; + return ntohl(len); + } else { + int type; + + if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR; + type = (buf[0]&0xC0)>>6; + if (type == REDIS_RDB_6BITLEN) { + /* Read a 6 bit len */ + return buf[0]&0x3F; + } else if (type == REDIS_RDB_ENCVAL) { + /* Read a 6 bit len encoding type */ + if (isencoded) *isencoded = 1; + return buf[0]&0x3F; + } else if (type == REDIS_RDB_14BITLEN) { + /* Read a 14 bit len */ + if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR; + return ((buf[0]&0x3F)<<8)|buf[1]; + } else { + /* Read a 32 bit len */ + if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR; + return ntohl(len); + } + } +} + +static robj *rdbLoadIntegerObject(FILE *fp, int enctype) { + unsigned char enc[4]; + long long val; + + if (enctype == REDIS_RDB_ENC_INT8) { + if (fread(enc,1,1,fp) == 0) return NULL; + val = (signed char)enc[0]; + } else if (enctype == REDIS_RDB_ENC_INT16) { + uint16_t v; + if (fread(enc,2,1,fp) == 0) return NULL; + v = enc[0]|(enc[1]<<8); + val = (int16_t)v; + } else if (enctype == REDIS_RDB_ENC_INT32) { + uint32_t v; + if (fread(enc,4,1,fp) == 0) return NULL; + v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24); + val = (int32_t)v; + } else { + val = 0; /* anti-warning */ + assert(0!=0); + } + return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val)); +} + +static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) { + unsigned int len, clen; + unsigned char *c = NULL; + sds val = NULL; + + if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((c = zmalloc(clen)) == NULL) goto err; + if ((val = sdsnewlen(NULL,len)) == NULL) goto err; + if (fread(c,clen,1,fp) == 0) goto err; + if (lzf_decompress(c,clen,val,len) == 0) goto err; + return createObject(REDIS_STRING,val); +err: + zfree(c); + sdsfree(val); + return NULL; +} + +static robj *rdbLoadStringObject(FILE*fp, int rdbver) { + int isencoded; + uint32_t len; + sds val; + + len = rdbLoadLen(fp,rdbver,&isencoded); + if (isencoded) { + switch(len) { + case REDIS_RDB_ENC_INT8: + case REDIS_RDB_ENC_INT16: + case REDIS_RDB_ENC_INT32: + return tryObjectSharing(rdbLoadIntegerObject(fp,len)); + case REDIS_RDB_ENC_LZF: + return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver)); + default: + assert(0!=0); + } + } + + if (len == REDIS_RDB_LENERR) return NULL; + val = sdsnewlen(NULL,len); + if (len && fread(val,len,1,fp) == 0) { + sdsfree(val); + return NULL; + } + return tryObjectSharing(createObject(REDIS_STRING,val)); +} + +static int rdbLoad(char *filename) { FILE *fp; - char buf[REDIS_LOADBUF_LEN]; /* Try to use this buffer instead of */ - char vbuf[REDIS_LOADBUF_LEN]; /* malloc() when the element is small */ - char *key = NULL, *val = NULL; - uint32_t klen,vlen,dbid; - uint8_t type; - int retval; - dict *d = server.dict[0]; + robj *keyobj = NULL; + uint32_t dbid; + int type, retval, rdbver; + dict *d = server.db[0].dict; + redisDb *db = server.db+0; + char buf[1024]; + time_t expiretime = -1, now = time(NULL); fp = fopen(filename,"r"); if (!fp) return REDIS_ERR; if (fread(buf,9,1,fp) == 0) goto eoferr; - if (memcmp(buf,"REDIS0000",9) != 0) { + buf[9] = '\0'; + if (memcmp(buf,"REDIS",5) != 0) { fclose(fp); redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file"); return REDIS_ERR; } + rdbver = atoi(buf+5); + if (rdbver > 1) { + fclose(fp); + redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver); + return REDIS_ERR; + } while(1) { robj *o; /* Read type. */ - if (fread(&type,1,1,fp) == 0) goto eoferr; + if ((type = rdbLoadType(fp)) == -1) goto eoferr; + if (type == REDIS_EXPIRETIME) { + if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr; + /* We read the time so we need to read the object type again */ + if ((type = rdbLoadType(fp)) == -1) goto eoferr; + } if (type == REDIS_EOF) break; /* Handle SELECT DB opcode as a special case */ if (type == REDIS_SELECTDB) { - if (fread(&dbid,4,1,fp) == 0) goto eoferr; - dbid = ntohl(dbid); + if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) + goto eoferr; if (dbid >= (unsigned)server.dbnum) { - redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server compiled to handle more than %d databases. Exiting\n", server.dbnum); + redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum); exit(1); } - d = server.dict[dbid]; + db = server.db+dbid; + d = db->dict; continue; } /* Read key */ - if (fread(&klen,4,1,fp) == 0) goto eoferr; - klen = ntohl(klen); - if (klen <= REDIS_LOADBUF_LEN) { - key = buf; - } else { - key = zmalloc(klen); - if (!key) oom("Loading DB from file"); - } - if (fread(key,klen,1,fp) == 0) goto eoferr; + if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr; if (type == REDIS_STRING) { /* Read string value */ - if (fread(&vlen,4,1,fp) == 0) goto eoferr; - vlen = ntohl(vlen); - if (vlen <= REDIS_LOADBUF_LEN) { - val = vbuf; - } else { - val = zmalloc(vlen); - if (!val) oom("Loading DB from file"); - } - if (vlen && fread(val,vlen,1,fp) == 0) goto eoferr; - o = createObject(REDIS_STRING,sdsnewlen(val,vlen)); + if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr; } else if (type == REDIS_LIST || type == REDIS_SET) { /* Read list/set value */ uint32_t listlen; - if (fread(&listlen,4,1,fp) == 0) goto eoferr; - listlen = ntohl(listlen); + + if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) + goto eoferr; o = (type == REDIS_LIST) ? createListObject() : createSetObject(); /* Load every single element of the list/set */ while(listlen--) { robj *ele; - if (fread(&vlen,4,1,fp) == 0) goto eoferr; - vlen = ntohl(vlen); - if (vlen <= REDIS_LOADBUF_LEN) { - val = vbuf; - } else { - val = zmalloc(vlen); - if (!val) oom("Loading DB from file"); - } - if (vlen && fread(val,vlen,1,fp) == 0) goto eoferr; - ele = createObject(REDIS_STRING,sdsnewlen(val,vlen)); + if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr; if (type == REDIS_LIST) { if (!listAddNodeTail((list*)o->ptr,ele)) oom("listAddNodeTail"); @@ -1617,37 +2073,47 @@ static int loadDb(char *filename) { if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR) oom("dictAdd"); } - /* free the temp buffer if needed */ - if (val != vbuf) zfree(val); - val = NULL; } } else { assert(0 != 0); } /* Add the new object in the hash table */ - retval = dictAdd(d,createStringObject(key,klen),o); + retval = dictAdd(d,keyobj,o); if (retval == DICT_ERR) { - redisLog(REDIS_WARNING,"Loading DB, duplicated key found! Unrecoverable error, exiting now."); + redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr); exit(1); } - /* Iteration cleanup */ - if (key != buf) zfree(key); - if (val != vbuf) zfree(val); - key = val = NULL; + /* Set the expire time if needed */ + if (expiretime != -1) { + setExpire(db,keyobj,expiretime); + /* Delete this key if already expired */ + if (expiretime < now) deleteKey(db,keyobj); + expiretime = -1; + } + keyobj = o = NULL; } fclose(fp); return REDIS_OK; eoferr: /* unexpected end of file is handled here with a fatal exit */ - if (key != buf) zfree(key); - if (val != vbuf) zfree(val); - redisLog(REDIS_WARNING,"Short read loading DB. Unrecoverable error, exiting now."); + if (keyobj) decrRefCount(keyobj); + redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now."); exit(1); return REDIS_ERR; /* Just to avoid warning */ } /*================================== Commands =============================== */ +static void authCommand(redisClient *c) { + if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) { + c->authenticated = 1; + addReply(c,shared.ok); + } else { + c->authenticated = 0; + addReply(c,shared.err); + } +} + static void pingCommand(redisClient *c) { addReply(c,shared.pong); } @@ -1664,10 +2130,10 @@ static void echoCommand(redisClient *c) { static void setGenericCommand(redisClient *c, int nx) { int retval; - retval = dictAdd(c->dict,c->argv[1],c->argv[2]); + retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); if (retval == DICT_ERR) { if (!nx) { - dictReplace(c->dict,c->argv[1],c->argv[2]); + dictReplace(c->db->dict,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); } else { addReply(c,shared.czero); @@ -1678,26 +2144,24 @@ static void setGenericCommand(redisClient *c, int nx) { incrRefCount(c->argv[2]); } server.dirty++; + removeExpire(c->db,c->argv[1]); addReply(c, nx ? shared.cone : shared.ok); } static void setCommand(redisClient *c) { - return setGenericCommand(c,0); + setGenericCommand(c,0); } static void setnxCommand(redisClient *c) { - return setGenericCommand(c,1); + setGenericCommand(c,1); } static void getCommand(redisClient *c) { - dictEntry *de; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + robj *o = lookupKeyRead(c->db,c->argv[1]); + + if (o == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_STRING) { addReply(c,shared.wrongtypeerr); } else { @@ -1709,17 +2173,14 @@ static void getCommand(redisClient *c) { } static void mgetCommand(redisClient *c) { - dictEntry *de; int j; addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1)); for (j = 1; j < c->argc; j++) { - de = dictFind(c->dict,c->argv[j]); - if (de == NULL) { + robj *o = lookupKeyRead(c->db,c->argv[j]); + if (o == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_STRING) { addReply(c,shared.nullbulk); } else { @@ -1732,17 +2193,14 @@ static void mgetCommand(redisClient *c) { } static void incrDecrCommand(redisClient *c, int incr) { - dictEntry *de; long long value; int retval; robj *o; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { value = 0; } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_STRING) { value = 0; } else { @@ -1754,9 +2212,10 @@ static void incrDecrCommand(redisClient *c, int incr) { value += incr; o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value)); - retval = dictAdd(c->dict,c->argv[1],o); + retval = dictAdd(c->db->dict,c->argv[1],o); if (retval == DICT_ERR) { - dictReplace(c->dict,c->argv[1],o); + dictReplace(c->db->dict,c->argv[1],o); + removeExpire(c->db,c->argv[1]); } else { incrRefCount(c->argv[1]); } @@ -1767,27 +2226,27 @@ static void incrDecrCommand(redisClient *c, int incr) { } static void incrCommand(redisClient *c) { - return incrDecrCommand(c,1); + incrDecrCommand(c,1); } static void decrCommand(redisClient *c) { - return incrDecrCommand(c,-1); + incrDecrCommand(c,-1); } static void incrbyCommand(redisClient *c) { int incr = atoi(c->argv[2]->ptr); - return incrDecrCommand(c,incr); + incrDecrCommand(c,incr); } static void decrbyCommand(redisClient *c) { int incr = atoi(c->argv[2]->ptr); - return incrDecrCommand(c,-incr); + incrDecrCommand(c,-incr); } /* ========================= Type agnostic commands ========================= */ static void delCommand(redisClient *c) { - if (dictDelete(c->dict,c->argv[1]) == DICT_OK) { + if (deleteKey(c->db,c->argv[1])) { server.dirty++; addReply(c,shared.cone); } else { @@ -1796,20 +2255,14 @@ static void delCommand(redisClient *c) { } static void existsCommand(redisClient *c) { - dictEntry *de; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) - addReply(c,shared.czero); - else - addReply(c,shared.cone); + addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero); } static void selectCommand(redisClient *c) { int id = atoi(c->argv[1]->ptr); if (selectDb(c,id) == REDIS_ERR) { - addReplySds(c,"-ERR invalid DB index\r\n"); + addReplySds(c,sdsnew("-ERR invalid DB index\r\n")); } else { addReply(c,shared.ok); } @@ -1817,9 +2270,13 @@ static void selectCommand(redisClient *c) { static void randomkeyCommand(redisClient *c) { dictEntry *de; - - de = dictGetRandomKey(c->dict); + + while(1) { + de = dictGetRandomKey(c->db->dict); + if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break; + } if (de == NULL) { + addReply(c,shared.plus); addReply(c,shared.crlf); } else { addReply(c,shared.plus); @@ -1836,20 +2293,23 @@ static void keysCommand(redisClient *c) { int numkeys = 0, keyslen = 0; robj *lenobj = createObject(REDIS_STRING,NULL); - di = dictGetIterator(c->dict); + di = dictGetIterator(c->db->dict); if (!di) oom("dictGetIterator"); addReply(c,lenobj); decrRefCount(lenobj); while((de = dictNext(di)) != NULL) { robj *keyobj = dictGetEntryKey(de); + sds key = keyobj->ptr; if ((pattern[0] == '*' && pattern[1] == '\0') || stringmatchlen(pattern,plen,key,sdslen(key),0)) { - if (numkeys != 0) - addReply(c,shared.space); - addReply(c,keyobj); - numkeys++; - keyslen += sdslen(key); + if (expireIfNeeded(c->db,keyobj) == 0) { + if (numkeys != 0) + addReply(c,shared.space); + addReply(c,keyobj); + numkeys++; + keyslen += sdslen(key); + } } } dictReleaseIterator(di); @@ -1859,7 +2319,7 @@ static void keysCommand(redisClient *c) { static void dbsizeCommand(redisClient *c) { addReplySds(c, - sdscatprintf(sdsempty(),":%lu\r\n",dictGetHashTableUsed(c->dict))); + sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict))); } static void lastsaveCommand(redisClient *c) { @@ -1868,15 +2328,13 @@ static void lastsaveCommand(redisClient *c) { } static void typeCommand(redisClient *c) { - dictEntry *de; + robj *o; char *type; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { type = "+none"; } else { - robj *o = dictGetEntryVal(de); - switch(o->type) { case REDIS_STRING: type = "+string"; break; case REDIS_LIST: type = "+list"; break; @@ -1889,7 +2347,11 @@ static void typeCommand(redisClient *c) { } static void saveCommand(redisClient *c) { - if (saveDb(server.dbfilename) == REDIS_OK) { + if (server.bgsaveinprogress) { + addReplySds(c,sdsnew("-ERR background save in progress\r\n")); + return; + } + if (rdbSave(server.dbfilename) == REDIS_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -1901,7 +2363,7 @@ static void bgsaveCommand(redisClient *c) { addReplySds(c,sdsnew("-ERR background save already in progress\r\n")); return; } - if (saveDbBackground(server.dbfilename) == REDIS_OK) { + if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -1910,7 +2372,8 @@ static void bgsaveCommand(redisClient *c) { static void shutdownCommand(redisClient *c) { redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); - if (saveDb(server.dbfilename) == REDIS_OK) { + /* XXX: TODO kill the child if there is a bgsave in progress */ + if (rdbSave(server.dbfilename) == REDIS_OK) { if (server.daemonize) { unlink(server.pidfile); } @@ -1923,7 +2386,6 @@ static void shutdownCommand(redisClient *c) { } static void renameGenericCommand(redisClient *c, int nx) { - dictEntry *de; robj *o; /* To use the same key as src and dst is probably an error */ @@ -1932,24 +2394,24 @@ static void renameGenericCommand(redisClient *c, int nx) { return; } - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); return; } - o = dictGetEntryVal(de); incrRefCount(o); - if (dictAdd(c->dict,c->argv[2],o) == DICT_ERR) { + deleteIfVolatile(c->db,c->argv[2]); + if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) { if (nx) { decrRefCount(o); addReply(c,shared.czero); return; } - dictReplace(c->dict,c->argv[2],o); + dictReplace(c->db->dict,c->argv[2],o); } else { incrRefCount(c->argv[2]); } - dictDelete(c->dict,c->argv[1]); + deleteKey(c->db,c->argv[1]); server.dirty++; addReply(c,nx ? shared.cone : shared.ok); } @@ -1963,21 +2425,19 @@ static void renamenxCommand(redisClient *c) { } static void moveCommand(redisClient *c) { - dictEntry *de; - robj *o, *key; - dict *src, *dst; + robj *o; + redisDb *src, *dst; int srcid; /* Obtain source and target DB pointers */ - src = c->dict; - srcid = c->dictid; + src = c->db; + srcid = c->db->id; if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) { addReply(c,shared.outofrangeerr); return; } - dst = c->dict; - c->dict = src; - c->dictid = srcid; + dst = c->db; + selectDb(c,srcid); /* Back to the source DB */ /* If the user is moving using as target the same * DB as the source DB it is probably an error. */ @@ -1987,24 +2447,23 @@ static void moveCommand(redisClient *c) { } /* Check if the element exists and get a reference */ - de = dictFind(c->dict,c->argv[1]); - if (!de) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (!o) { addReply(c,shared.czero); return; } /* Try to add the element to the target DB */ - key = dictGetEntryKey(de); - o = dictGetEntryVal(de); - if (dictAdd(dst,key,o) == DICT_ERR) { + deleteIfVolatile(dst,c->argv[1]); + if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) { addReply(c,shared.czero); return; } - incrRefCount(key); + incrRefCount(c->argv[1]); incrRefCount(o); /* OK! key moved, free the entry in the source DB */ - dictDelete(src,c->argv[1]); + deleteKey(src,c->argv[1]); server.dirty++; addReply(c,shared.cone); } @@ -2012,11 +2471,10 @@ static void moveCommand(redisClient *c) { /* =================================== Lists ================================ */ static void pushGenericCommand(redisClient *c, int where) { robj *lobj; - dictEntry *de; list *list; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + + lobj = lookupKeyWrite(c->db,c->argv[1]); + if (lobj == NULL) { lobj = createListObject(); list = lobj->ptr; if (where == REDIS_HEAD) { @@ -2024,11 +2482,10 @@ static void pushGenericCommand(redisClient *c, int where) { } else { if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail"); } - dictAdd(c->dict,c->argv[1],lobj); + dictAdd(c->db->dict,c->argv[1],lobj); incrRefCount(c->argv[1]); incrRefCount(c->argv[2]); } else { - lobj = dictGetEntryVal(de); if (lobj->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); return; @@ -2054,15 +2511,14 @@ static void rpushCommand(redisClient *c) { } static void llenCommand(redisClient *c) { - dictEntry *de; + robj *o; list *l; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.czero); return; } else { - robj *o = dictGetEntryVal(de); if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2073,15 +2529,13 @@ static void llenCommand(redisClient *c) { } static void lindexCommand(redisClient *c) { - dictEntry *de; + robj *o; int index = atoi(c->argv[2]->ptr); - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2102,15 +2556,13 @@ static void lindexCommand(redisClient *c) { } static void lsetCommand(redisClient *c) { - dictEntry *de; + robj *o; int index = atoi(c->argv[2]->ptr); - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2134,14 +2586,12 @@ static void lsetCommand(redisClient *c) { } static void popGenericCommand(redisClient *c, int where) { - dictEntry *de; - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + robj *o; + + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2176,16 +2626,14 @@ static void rpopCommand(redisClient *c) { } static void lrangeCommand(redisClient *c) { - dictEntry *de; + robj *o; int start = atoi(c->argv[2]->ptr); int end = atoi(c->argv[3]->ptr); - - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nullmultibulk); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2225,16 +2673,14 @@ static void lrangeCommand(redisClient *c) { } static void ltrimCommand(redisClient *c) { - dictEntry *de; + robj *o; int start = atoi(c->argv[2]->ptr); int end = atoi(c->argv[3]->ptr); - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.nokeyerr); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2276,14 +2722,12 @@ static void ltrimCommand(redisClient *c) { } static void lremCommand(redisClient *c) { - dictEntry *de; + robj *o; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { - addReply(c,shared.minus1); + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { + addReply(c,shared.nokeyerr); } else { - robj *o = dictGetEntryVal(de); - if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); } else { @@ -2299,8 +2743,9 @@ static void lremCommand(redisClient *c) { } ln = fromtail ? list->tail : list->head; while (ln) { - next = fromtail ? ln->prev : ln->next; robj *ele = listNodeValue(ln); + + next = fromtail ? ln->prev : ln->next; if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) { listDelNode(list,ln); server.dirty++; @@ -2317,16 +2762,14 @@ static void lremCommand(redisClient *c) { /* ==================================== Sets ================================ */ static void saddCommand(redisClient *c) { - dictEntry *de; robj *set; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + set = lookupKeyWrite(c->db,c->argv[1]); + if (set == NULL) { set = createSetObject(); - dictAdd(c->dict,c->argv[1],set); + dictAdd(c->db->dict,c->argv[1],set); incrRefCount(c->argv[1]); } else { - set = dictGetEntryVal(de); if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); return; @@ -2342,15 +2785,12 @@ static void saddCommand(redisClient *c) { } static void sremCommand(redisClient *c) { - dictEntry *de; + robj *set; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + set = lookupKeyWrite(c->db,c->argv[1]); + if (set == NULL) { addReply(c,shared.czero); } else { - robj *set; - - set = dictGetEntryVal(de); if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); return; @@ -2365,15 +2805,12 @@ static void sremCommand(redisClient *c) { } static void sismemberCommand(redisClient *c) { - dictEntry *de; + robj *set; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + set = lookupKeyRead(c->db,c->argv[1]); + if (set == NULL) { addReply(c,shared.czero); } else { - robj *set; - - set = dictGetEntryVal(de); if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); return; @@ -2386,21 +2823,20 @@ static void sismemberCommand(redisClient *c) { } static void scardCommand(redisClient *c) { - dictEntry *de; + robj *o; dict *s; - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { addReply(c,shared.czero); return; } else { - robj *o = dictGetEntryVal(de); if (o->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); } else { s = o->ptr; addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n", - dictGetHashTableUsed(s))); + dictSize(s))); } } } @@ -2408,7 +2844,7 @@ static void scardCommand(redisClient *c) { static int qsortCompareSetsByCardinality(const void *s1, const void *s2) { dict **d1 = (void*) s1, **d2 = (void*) s2; - return dictGetHashTableUsed(*d1)-dictGetHashTableUsed(*d2); + return dictSize(*d1)-dictSize(*d2); } static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) { @@ -2421,15 +2857,20 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r if (!dv) oom("sinterCommand"); for (j = 0; j < setsnum; j++) { robj *setobj; - dictEntry *de; - - de = dictFind(c->dict,setskeys[j]); - if (!de) { + + setobj = dstkey ? + lookupKeyWrite(c->db,setskeys[j]) : + lookupKeyRead(c->db,setskeys[j]); + if (!setobj) { zfree(dv); - addReply(c,shared.nokeyerr); + if (dstkey) { + deleteKey(c->db,dstkey); + addReply(c,shared.ok); + } else { + addReply(c,shared.nullmultibulk); + } return; } - setobj = dictGetEntryVal(de); if (setobj->type != REDIS_SET) { zfree(dv); addReply(c,shared.wrongtypeerr); @@ -2454,8 +2895,8 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r /* If we have a target key where to store the resulting set * create this key with an empty set inside */ dstset = createSetObject(); - dictDelete(c->dict,dstkey); - dictAdd(c->dict,dstkey,dstset); + deleteKey(c->db,dstkey); + dictAdd(c->db->dict,dstkey,dstset); incrRefCount(dstkey); } @@ -2485,10 +2926,12 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r } dictReleaseIterator(di); - if (!dstkey) + if (!dstkey) { lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality); - else + } else { addReply(c,shared.ok); + server.dirty++; + } zfree(dv); } @@ -2500,16 +2943,113 @@ static void sinterstoreCommand(redisClient *c) { sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]); } +static void sunionGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) { + dict **dv = zmalloc(sizeof(dict*)*setsnum); + dictIterator *di; + dictEntry *de; + robj *lenobj = NULL, *dstset = NULL; + int j, cardinality = 0; + + if (!dv) oom("sunionCommand"); + for (j = 0; j < setsnum; j++) { + robj *setobj; + + setobj = dstkey ? + lookupKeyWrite(c->db,setskeys[j]) : + lookupKeyRead(c->db,setskeys[j]); + if (!setobj) { + dv[j] = NULL; + continue; + } + if (setobj->type != REDIS_SET) { + zfree(dv); + addReply(c,shared.wrongtypeerr); + return; + } + dv[j] = setobj->ptr; + } + + /* We need a temp set object to store our union. If the dstkey + * is not NULL (that is, we are inside an SUNIONSTORE operation) then + * this set object will be the resulting object to set into the target key*/ + dstset = createSetObject(); + + /* The first thing we should output is the total number of elements... + * since this is a multi-bulk write, but at this stage we don't know + * the intersection set size, so we use a trick, append an empty object + * to the output list and save the pointer to later modify it with the + * right length */ + if (!dstkey) { + lenobj = createObject(REDIS_STRING,NULL); + addReply(c,lenobj); + decrRefCount(lenobj); + } else { + /* If we have a target key where to store the resulting set + * create this key with an empty set inside */ + deleteKey(c->db,dstkey); + dictAdd(c->db->dict,dstkey,dstset); + incrRefCount(dstkey); + server.dirty++; + } + + /* Iterate all the elements of all the sets, add every element a single + * time to the result set */ + for (j = 0; j < setsnum; j++) { + if (!dv[j]) continue; /* non existing keys are like empty sets */ + + di = dictGetIterator(dv[j]); + if (!di) oom("dictGetIterator"); + + while((de = dictNext(di)) != NULL) { + robj *ele; + + /* dictAdd will not add the same element multiple times */ + ele = dictGetEntryKey(de); + if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) { + incrRefCount(ele); + if (!dstkey) { + addReplySds(c,sdscatprintf(sdsempty(), + "$%d\r\n",sdslen(ele->ptr))); + addReply(c,ele); + addReply(c,shared.crlf); + cardinality++; + } + } + } + dictReleaseIterator(di); + } + + if (!dstkey) { + lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality); + decrRefCount(dstset); + } else { + addReply(c,shared.ok); + server.dirty++; + } + zfree(dv); +} + +static void sunionCommand(redisClient *c) { + sunionGenericCommand(c,c->argv+1,c->argc-1,NULL); +} + +static void sunionstoreCommand(redisClient *c) { + sunionGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]); +} + static void flushdbCommand(redisClient *c) { - dictEmpty(c->dict); + dictEmpty(c->db->dict); + dictEmpty(c->db->expires); + server.dirty++; addReply(c,shared.ok); - saveDb(server.dbfilename); + rdbSave(server.dbfilename); } static void flushallCommand(redisClient *c) { emptyDb(); + server.dirty++; addReply(c,shared.ok); - saveDb(server.dbfilename); + rdbSave(server.dbfilename); } redisSortOperation *createSortOperation(int type, robj *pattern) { @@ -2522,12 +3062,11 @@ redisSortOperation *createSortOperation(int type, robj *pattern) { /* Return the value associated to the key with a name obtained * substituting the first occurence of '*' in 'pattern' with 'subst' */ -robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) { +robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { char *p; sds spat, ssub; robj keyobj; int prefixlen, sublen, postfixlen; - dictEntry *de; /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */ struct { long len; @@ -2535,7 +3074,6 @@ robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) { char buf[REDIS_SORTKEY_MAX+1]; } keyname; - spat = pattern->ptr; ssub = subst->ptr; if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL; @@ -2555,10 +3093,8 @@ robj *lookupKeyByPattern(dict *dict, robj *pattern, robj *subst) { keyobj.type = REDIS_STRING; keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2); - de = dictFind(dict,&keyobj); - // printf("lookup '%s' => %p\n", keyname.buf,de); - if (!de) return NULL; - return dictGetEntryVal(de); + /* printf("lookup '%s' => %p\n", keyname.buf,de); */ + return lookupKeyRead(db,&keyobj); } /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with @@ -2603,7 +3139,6 @@ static int sortCompare(const void *s1, const void *s2) { /* The SORT command is the most complex command in Redis. Warning: this code * is optimized for speed and a bit less for readability */ static void sortCommand(redisClient *c) { - dictEntry *de; list *operations; int outputlen = 0; int desc = 0, alpha = 0; @@ -2614,12 +3149,11 @@ static void sortCommand(redisClient *c) { redisSortObject *vector; /* Resulting vector to sort */ /* Lookup the key to sort. It must be of the right types */ - de = dictFind(c->dict,c->argv[1]); - if (de == NULL) { + sortval = lookupKeyRead(c->db,c->argv[1]); + if (sortval == NULL) { addReply(c,shared.nokeyerr); return; } - sortval = dictGetEntryVal(de); if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); return; @@ -2684,19 +3218,20 @@ static void sortCommand(redisClient *c) { /* Load the sorting vector with all the objects to sort */ vectorlen = (sortval->type == REDIS_LIST) ? listLength((list*)sortval->ptr) : - dictGetHashTableUsed((dict*)sortval->ptr); + dictSize((dict*)sortval->ptr); vector = zmalloc(sizeof(redisSortObject)*vectorlen); if (!vector) oom("allocating objects vector for SORT"); j = 0; if (sortval->type == REDIS_LIST) { list *list = sortval->ptr; - listNode *ln = list->head; - while(ln) { + listNode *ln; + + listRewind(list); + while((ln = listYield(list))) { robj *ele = ln->value; vector[j].obj = ele; vector[j].u.score = 0; vector[j].u.cmpobj = NULL; - ln = ln->next; j++; } } else { @@ -2722,7 +3257,7 @@ static void sortCommand(redisClient *c) { if (sortby) { robj *byval; - byval = lookupKeyByPattern(c->dict,sortby,vector[j].obj); + byval = lookupKeyByPattern(c->db,sortby,vector[j].obj); if (!byval || byval->type != REDIS_STRING) continue; if (alpha) { vector[j].u.cmpobj = byval; @@ -2758,16 +3293,17 @@ static void sortCommand(redisClient *c) { outputlen = getop ? getop*(end-start+1) : end-start+1; addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen)); for (j = start; j <= end; j++) { - listNode *ln = operations->head; + listNode *ln; if (!getop) { addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n", sdslen(vector[j].obj->ptr))); addReply(c,vector[j].obj); addReply(c,shared.crlf); } - while(ln) { + listRewind(operations); + while((ln = listYield(operations))) { redisSortOperation *sop = ln->value; - robj *val = lookupKeyByPattern(c->dict,sop->pattern, + robj *val = lookupKeyByPattern(c->db,sop->pattern, vector[j].obj); if (sop->type == REDIS_SORT_GET) { @@ -2782,7 +3318,6 @@ static void sortCommand(redisClient *c) { } else if (sop->type == REDIS_SORT_DEL) { /* TODO */ } - ln = ln->next; } } @@ -2804,7 +3339,7 @@ static void infoCommand(redisClient *c) { "redis_version:%s\r\n" "connected_clients:%d\r\n" "connected_slaves:%d\r\n" - "used_memory:%d\r\n" + "used_memory:%zu\r\n" "changes_since_last_save:%lld\r\n" "last_save_time:%d\r\n" "total_connections_received:%lld\r\n" @@ -2827,26 +3362,101 @@ static void infoCommand(redisClient *c) { addReply(c,shared.crlf); } -/* =============================== Replication ============================= */ +static void monitorCommand(redisClient *c) { + /* ignore MONITOR if aleady slave or in monitor mode */ + if (c->flags & REDIS_SLAVE) return; -/* Send the whole output buffer syncronously to the slave. This a general operation in theory, but it is actually useful only for replication. */ -static int flushClientOutput(redisClient *c) { - int retval; - time_t start = time(NULL); + c->flags |= (REDIS_SLAVE|REDIS_MONITOR); + c->slaveseldb = 0; + if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail"); + addReply(c,shared.ok); +} - while(listLength(c->reply)) { - if (time(NULL)-start > 5) return REDIS_ERR; /* 5 seconds timeout */ - retval = aeWait(c->fd,AE_WRITABLE,1000); - if (retval == -1) { - return REDIS_ERR; - } else if (retval & AE_WRITABLE) { - sendReplyToClient(NULL, c->fd, c, AE_WRITABLE); - } +/* ================================= Expire ================================= */ +static int removeExpire(redisDb *db, robj *key) { + if (dictDelete(db->expires,key) == DICT_OK) { + return 1; + } else { + return 0; } - return REDIS_OK; } -static int syncWrite(int fd, void *ptr, ssize_t size, int timeout) { +static int setExpire(redisDb *db, robj *key, time_t when) { + if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) { + return 0; + } else { + incrRefCount(key); + return 1; + } +} + +/* Return the expire time of the specified key, or -1 if no expire + * is associated with this key (i.e. the key is non volatile) */ +static time_t getExpire(redisDb *db, robj *key) { + dictEntry *de; + + /* No expire? return ASAP */ + if (dictSize(db->expires) == 0 || + (de = dictFind(db->expires,key)) == NULL) return -1; + + return (time_t) dictGetEntryVal(de); +} + +static int expireIfNeeded(redisDb *db, robj *key) { + time_t when; + dictEntry *de; + + /* No expire? return ASAP */ + if (dictSize(db->expires) == 0 || + (de = dictFind(db->expires,key)) == NULL) return 0; + + /* Lookup the expire */ + when = (time_t) dictGetEntryVal(de); + if (time(NULL) <= when) return 0; + + /* Delete the key */ + dictDelete(db->expires,key); + return dictDelete(db->dict,key) == DICT_OK; +} + +static int deleteIfVolatile(redisDb *db, robj *key) { + dictEntry *de; + + /* No expire? return ASAP */ + if (dictSize(db->expires) == 0 || + (de = dictFind(db->expires,key)) == NULL) return 0; + + /* Delete the key */ + server.dirty++; + dictDelete(db->expires,key); + return dictDelete(db->dict,key) == DICT_OK; +} + +static void expireCommand(redisClient *c) { + dictEntry *de; + int seconds = atoi(c->argv[2]->ptr); + + de = dictFind(c->db->dict,c->argv[1]); + if (de == NULL) { + addReply(c,shared.czero); + return; + } + if (seconds <= 0) { + addReply(c, shared.czero); + return; + } else { + time_t when = time(NULL)+seconds; + if (setExpire(c->db,c->argv[1],when)) + addReply(c,shared.cone); + else + addReply(c,shared.czero); + return; + } +} + +/* =============================== Replication ============================= */ + +static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) { ssize_t nwritten, ret = size; time_t start = time(NULL); @@ -2866,7 +3476,7 @@ static int syncWrite(int fd, void *ptr, ssize_t size, int timeout) { return ret; } -static int syncRead(int fd, void *ptr, ssize_t size, int timeout) { +static int syncRead(int fd, char *ptr, ssize_t size, int timeout) { ssize_t nread, totread = 0; time_t start = time(NULL); @@ -2909,47 +3519,165 @@ static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) { } static void syncCommand(redisClient *c) { - struct stat sb; - int fd = -1, len; - time_t start = time(NULL); - char sizebuf[32]; - /* ignore SYNC if aleady slave or in monitor mode */ if (c->flags & REDIS_SLAVE) return; - redisLog(REDIS_NOTICE,"Slave ask for syncronization"); - if (flushClientOutput(c) == REDIS_ERR || saveDb(server.dbfilename) != REDIS_OK) - goto closeconn; - - fd = open(server.dbfilename, O_RDONLY); - if (fd == -1 || fstat(fd,&sb) == -1) goto closeconn; - len = sb.st_size; - - snprintf(sizebuf,32,"$%d\r\n",len); - if (syncWrite(c->fd,sizebuf,strlen(sizebuf),5) == -1) goto closeconn; - while(len) { - char buf[1024]; - int nread; + /* SYNC can't be issued when the server has pending data to send to + * the client about already issued commands. We need a fresh reply + * buffer registering the differences between the BGSAVE and the current + * dataset, so that we can copy to other slaves if needed. */ + if (listLength(c->reply) != 0) { + addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n")); + return; + } - if (time(NULL)-start > REDIS_MAX_SYNC_TIME) goto closeconn; - nread = read(fd,buf,1024); - if (nread == -1) goto closeconn; - len -= nread; - if (syncWrite(c->fd,buf,nread,5) == -1) goto closeconn; + redisLog(REDIS_NOTICE,"Slave ask for synchronization"); + /* Here we need to check if there is a background saving operation + * in progress, or if it is required to start one */ + if (server.bgsaveinprogress) { + /* Ok a background save is in progress. Let's check if it is a good + * one for replication, i.e. if there is another slave that is + * registering differences since the server forked to save */ + redisClient *slave; + listNode *ln; + + listRewind(server.slaves); + while((ln = listYield(server.slaves))) { + slave = ln->value; + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; + } + if (ln) { + /* Perfect, the server is already registering differences for + * another slave. Set the right state, and copy the buffer. */ + listRelease(c->reply); + c->reply = listDup(slave->reply); + if (!c->reply) oom("listDup copying slave reply list"); + c->replstate = REDIS_REPL_WAIT_BGSAVE_END; + redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); + } else { + /* No way, we need to wait for the next BGSAVE in order to + * register differences */ + c->replstate = REDIS_REPL_WAIT_BGSAVE_START; + redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); + } + } else { + /* Ok we don't have a BGSAVE in progress, let's start one */ + redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); + if (rdbSaveBackground(server.dbfilename) != REDIS_OK) { + redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); + addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n")); + return; + } + c->replstate = REDIS_REPL_WAIT_BGSAVE_END; } - if (syncWrite(c->fd,"\r\n",2,5) == -1) goto closeconn; - close(fd); + c->repldbfd = -1; c->flags |= REDIS_SLAVE; c->slaveseldb = 0; if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail"); - redisLog(REDIS_NOTICE,"Syncronization with slave succeeded"); return; +} -closeconn: - if (fd != -1) close(fd); - c->flags |= REDIS_CLOSE; - redisLog(REDIS_WARNING,"Syncronization with slave failed"); - return; +static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { + redisClient *slave = privdata; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + char buf[REDIS_IOBUF_LEN]; + ssize_t nwritten, buflen; + + if (slave->repldboff == 0) { + /* Write the bulk write count before to transfer the DB. In theory here + * we don't know how much room there is in the output buffer of the + * socket, but in pratice SO_SNDLOWAT (the minimum count for output + * operations) will never be smaller than the few bytes we need. */ + sds bulkcount; + + bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) + slave->repldbsize); + if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount)) + { + sdsfree(bulkcount); + freeClient(slave); + return; + } + sdsfree(bulkcount); + } + lseek(slave->repldbfd,slave->repldboff,SEEK_SET); + buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); + if (buflen <= 0) { + redisLog(REDIS_WARNING,"Read error sending DB to slave: %s", + (buflen == 0) ? "premature EOF" : strerror(errno)); + freeClient(slave); + return; + } + if ((nwritten = write(fd,buf,buflen)) == -1) { + redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s", + strerror(errno)); + freeClient(slave); + return; + } + slave->repldboff += nwritten; + if (slave->repldboff == slave->repldbsize) { + close(slave->repldbfd); + slave->repldbfd = -1; + aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); + slave->replstate = REDIS_REPL_ONLINE; + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, + sendReplyToClient, slave, NULL) == AE_ERR) { + freeClient(slave); + return; + } + addReplySds(slave,sdsempty()); + redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); + } +} + +static void updateSalvesWaitingBgsave(int bgsaveerr) { + listNode *ln; + int startbgsave = 0; + + listRewind(server.slaves); + while((ln = listYield(server.slaves))) { + redisClient *slave = ln->value; + + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { + startbgsave = 1; + slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; + } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { + struct stat buf; + + if (bgsaveerr != REDIS_OK) { + freeClient(slave); + redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); + continue; + } + if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 || + fstat(slave->repldbfd,&buf) == -1) { + freeClient(slave); + redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); + continue; + } + slave->repldboff = 0; + slave->repldbsize = buf.st_size; + slave->replstate = REDIS_REPL_SEND_BULK; + aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) { + freeClient(slave); + continue; + } + } + } + if (startbgsave) { + if (rdbSaveBackground(server.dbfilename) != REDIS_OK) { + listRewind(server.slaves); + redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed"); + while((ln = listYield(server.slaves))) { + redisClient *slave = ln->value; + + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) + freeClient(slave); + } + } + } } static int syncWithMaster(void) { @@ -3015,7 +3743,7 @@ static int syncWithMaster(void) { return REDIS_ERR; } emptyDb(); - if (loadDb(server.dbfilename) != REDIS_OK) { + if (rdbLoad(server.dbfilename) != REDIS_OK) { redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); close(fd); return REDIS_ERR; @@ -3026,16 +3754,6 @@ static int syncWithMaster(void) { return REDIS_OK; } -static void monitorCommand(redisClient *c) { - /* ignore MONITOR if aleady slave or in monitor mode */ - if (c->flags & REDIS_SLAVE) return; - - c->flags |= (REDIS_SLAVE|REDIS_MONITOR); - c->slaveseldb = 0; - if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail"); - addReply(c,shared.ok); -} - /* =================================== Main! ================================ */ static void daemonize(void) { @@ -3074,7 +3792,7 @@ int main(int argc, char **argv) { initServer(); if (server.daemonize) daemonize(); redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION); - if (loadDb(server.dbfilename) == REDIS_OK) + if (rdbLoad(server.dbfilename) == REDIS_OK) redisLog(REDIS_NOTICE,"DB loaded from disk"); if (aeCreateFileEvent(server.el, server.fd, AE_READABLE, acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");