X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/575acbf1f72b783bba13aee3ddf3bb1110f114fc..f7acd60336702b273886ef49650de34b143cbf36:/redis.c diff --git a/redis.c b/redis.c index 05a623d5..b925b551 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "0.09" +#define REDIS_VERSION "0.101" #include "fmacros.h" @@ -56,7 +56,8 @@ #include "dict.h" /* Hash tables */ #include "adlist.h" /* Linked lists */ #include "zmalloc.h" /* total memory usage aware version of malloc/free */ -#include "lzf.h" +#include "lzf.h" /* LZF compression library */ +#include "pqsort.h" /* Partial qsort for SORT+LIMIT */ /* Error codes */ #define REDIS_OK 0 @@ -65,9 +66,9 @@ /* Static server configuration */ #define REDIS_SERVERPORT 6379 /* TCP port */ #define REDIS_MAXIDLETIME (60*5) /* default client timeout */ -#define REDIS_QUERYBUF_LEN 1024 +#define REDIS_IOBUF_LEN 1024 #define REDIS_LOADBUF_LEN 1024 -#define REDIS_MAX_ARGS 16 +#define REDIS_STATIC_ARGS 4 #define REDIS_DEFAULT_DBNUM 16 #define REDIS_CONFIGLINE_MAX 1024 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */ @@ -79,8 +80,13 @@ #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */ /* Command flags */ -#define REDIS_CMD_BULK 1 -#define REDIS_CMD_INLINE 2 +#define REDIS_CMD_BULK 1 /* Bulk write command */ +#define REDIS_CMD_INLINE 2 /* Inline command */ +/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with + this flags will return an error when the 'maxmemory' option is set in the + config file and the server is using more than maxmemory bytes of memory. + In short this commands are denied on low memory conditions. */ +#define REDIS_CMD_DENYOOM 4 /* Object types */ #define REDIS_STRING 0 @@ -126,11 +132,20 @@ #define REDIS_MASTER 4 /* This client is a master server */ #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */ -/* Server replication state */ +/* Slave replication state - slave side */ #define REDIS_REPL_NONE 0 /* No active replication */ #define REDIS_REPL_CONNECT 1 /* Must connect to master */ #define REDIS_REPL_CONNECTED 2 /* Connected to master */ +/* Slave replication state - from the point of view of master + * Note that in SEND_BULK and ONLINE state the slave receives new updates + * in its output queue. In the WAIT_BGSAVE state instead the server is waiting + * to start the next background saving in order to send updates to it. */ +#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */ +#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */ +#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */ +#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */ + /* List related stuff */ #define REDIS_HEAD 0 #define REDIS_TAIL 1 @@ -174,15 +189,19 @@ typedef struct redisClient { redisDb *db; int dictid; sds querybuf; - robj *argv[REDIS_MAX_ARGS]; + robj **argv; int argc; - int bulklen; /* bulk read len. -1 if not in bulk read mode */ + int bulklen; /* bulk read len. -1 if not in bulk read mode */ list *reply; int sentlen; time_t lastinteraction; /* time of the last interaction, used for timeout */ - int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */ - int slaveseldb; /* slave selected db, if this client is a slave */ - int authenticated; /* when requirepass is non-NULL */ + int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */ + int slaveseldb; /* slave selected db, if this client is a slave */ + int authenticated; /* when requirepass is non-NULL */ + int replstate; /* replication state if this is a slave */ + int repldbfd; /* replication DB file descriptor */ + long repldboff; /* replication DB file offset */ + off_t repldbsize; /* replication DB file size */ } redisClient; struct saveparam { @@ -229,8 +248,10 @@ struct redisServer { int isslave; char *masterhost; int masterport; - redisClient *master; + redisClient *master; /* client that is master for this slave */ int replstate; + unsigned int maxclients; + unsigned int maxmemory; /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -291,6 +312,8 @@ static int deleteIfVolatile(redisDb *db, robj *key); static int deleteKey(redisDb *db, robj *key); static time_t getExpire(redisDb *db, robj *key); static int setExpire(redisDb *db, robj *key, time_t when); +static void updateSalvesWaitingBgsave(int bgsaveerr); +static void freeMemoryIfNeeded(void); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); @@ -327,10 +350,15 @@ static void typeCommand(redisClient *c); static void lsetCommand(redisClient *c); static void saddCommand(redisClient *c); static void sremCommand(redisClient *c); +static void smoveCommand(redisClient *c); static void sismemberCommand(redisClient *c); static void scardCommand(redisClient *c); static void sinterCommand(redisClient *c); static void sinterstoreCommand(redisClient *c); +static void sunionCommand(redisClient *c); +static void sunionstoreCommand(redisClient *c); +static void sdiffCommand(redisClient *c); +static void sdiffstoreCommand(redisClient *c); static void syncCommand(redisClient *c); static void flushdbCommand(redisClient *c); static void flushallCommand(redisClient *c); @@ -340,6 +368,9 @@ static void infoCommand(redisClient *c); static void mgetCommand(redisClient *c); static void monitorCommand(redisClient *c); static void expireCommand(redisClient *c); +static void getSetCommand(redisClient *c); +static void ttlCommand(redisClient *c); +static void slaveofCommand(redisClient *c); /*================================= Globals ================================= */ @@ -347,37 +378,44 @@ static void expireCommand(redisClient *c); static struct redisServer server; /* server global state */ static struct redisCommand cmdTable[] = { {"get",getCommand,2,REDIS_CMD_INLINE}, - {"set",setCommand,3,REDIS_CMD_BULK}, - {"setnx",setnxCommand,3,REDIS_CMD_BULK}, - {"del",delCommand,2,REDIS_CMD_INLINE}, + {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, + {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, + {"del",delCommand,-2,REDIS_CMD_INLINE}, {"exists",existsCommand,2,REDIS_CMD_INLINE}, - {"incr",incrCommand,2,REDIS_CMD_INLINE}, - {"decr",decrCommand,2,REDIS_CMD_INLINE}, + {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, {"mget",mgetCommand,-2,REDIS_CMD_INLINE}, - {"rpush",rpushCommand,3,REDIS_CMD_BULK}, - {"lpush",lpushCommand,3,REDIS_CMD_BULK}, + {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, + {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"rpop",rpopCommand,2,REDIS_CMD_INLINE}, {"lpop",lpopCommand,2,REDIS_CMD_INLINE}, {"llen",llenCommand,2,REDIS_CMD_INLINE}, {"lindex",lindexCommand,3,REDIS_CMD_INLINE}, - {"lset",lsetCommand,4,REDIS_CMD_BULK}, + {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"lrange",lrangeCommand,4,REDIS_CMD_INLINE}, {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE}, {"lrem",lremCommand,4,REDIS_CMD_BULK}, - {"sadd",saddCommand,3,REDIS_CMD_BULK}, + {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"srem",sremCommand,3,REDIS_CMD_BULK}, + {"smove",smoveCommand,4,REDIS_CMD_BULK}, {"sismember",sismemberCommand,3,REDIS_CMD_BULK}, {"scard",scardCommand,2,REDIS_CMD_INLINE}, - {"sinter",sinterCommand,-2,REDIS_CMD_INLINE}, - {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE}, + {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, {"smembers",sinterCommand,2,REDIS_CMD_INLINE}, - {"incrby",incrbyCommand,3,REDIS_CMD_INLINE}, - {"decrby",decrbyCommand,3,REDIS_CMD_INLINE}, + {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE}, {"select",selectCommand,2,REDIS_CMD_INLINE}, {"move",moveCommand,3,REDIS_CMD_INLINE}, {"rename",renameCommand,3,REDIS_CMD_INLINE}, {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE}, + {"expire",expireCommand,3,REDIS_CMD_INLINE}, {"keys",keysCommand,2,REDIS_CMD_INLINE}, {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE}, {"auth",authCommand,2,REDIS_CMD_INLINE}, @@ -391,10 +429,11 @@ static struct redisCommand cmdTable[] = { {"sync",syncCommand,1,REDIS_CMD_INLINE}, {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE}, {"flushall",flushallCommand,1,REDIS_CMD_INLINE}, - {"sort",sortCommand,-2,REDIS_CMD_INLINE}, + {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, {"info",infoCommand,1,REDIS_CMD_INLINE}, {"monitor",monitorCommand,1,REDIS_CMD_INLINE}, - {"expire",expireCommand,3,REDIS_CMD_INLINE}, + {"ttl",ttlCommand,2,REDIS_CMD_INLINE}, + {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE}, {NULL,NULL,0,0} }; @@ -534,7 +573,12 @@ void redisLog(int level, const char *fmt, ...) va_start(ap, fmt); if (level >= server.verbosity) { char *c = ".-*"; - fprintf(fp,"%c ",c[level]); + char buf[64]; + time_t now; + + now = time(NULL); + strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now)); + fprintf(fp,"%s %c ",buf,c[level]); vfprintf(fp, fmt, ap); fprintf(fp,"\n"); fflush(fp); @@ -616,21 +660,38 @@ static void oom(const char *msg) { /* ====================== Redis server networking stuff ===================== */ void closeTimedoutClients(void) { redisClient *c; - listIter *li; listNode *ln; time_t now = time(NULL); - li = listGetIterator(server.clients,AL_START_HEAD); - if (!li) return; - while ((ln = listNextElement(li)) != NULL) { + listRewind(server.clients); + while ((ln = listYield(server.clients)) != NULL) { c = listNodeValue(ln); if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ + !(c->flags & REDIS_MASTER) && /* no timeout for masters */ (now - c->lastinteraction > server.maxidletime)) { redisLog(REDIS_DEBUG,"Closing idle client"); freeClient(c); } } - listReleaseIterator(li); +} + +/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL + * we resize the hash table to save memory */ +void tryResizeHashTables(void) { + int j; + + for (j = 0; j < server.dbnum; j++) { + long long size, used; + + size = dictSlots(server.db[j].dict); + used = dictSize(server.db[j].dict); + if (size && used && size > REDIS_HT_MINSLOTS && + (used*100/size < REDIS_HT_MINFILL)) { + redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j); + dictResize(server.db[j].dict); + redisLog(REDIS_NOTICE,"Hash table %d resized.",j); + } + } } int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { @@ -642,10 +703,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Update the global state with the amount of used memory */ server.usedmemory = zmalloc_used_memory(); - /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL - * we resize the hash table to save memory */ + /* Show some info about non-empty databases */ for (j = 0; j < server.dbnum; j++) { - int size, used, vkeys; + long long size, used, vkeys; size = dictSlots(server.db[j].dict); used = dictSize(server.db[j].dict); @@ -654,14 +714,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size); /* dictPrintStats(server.dict); */ } - if (size && used && size > REDIS_HT_MINSLOTS && - (used*100/size < REDIS_HT_MINFILL)) { - redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j); - dictResize(server.db[j].dict); - redisLog(REDIS_NOTICE,"Hash table %d resized.",j); - } } + /* We don't want to resize the hash tables while a bacground saving + * is in progress: the saving child is created using fork() that is + * implemented with a copy-on-write semantic in most modern systems, so + * if we resize the HT while there is the saving child at work actually + * a lot of memory movements in the parent will cause a lot of pages + * copied. */ + if (!server.bgsaveinprogress) tryResizeHashTables(); + /* Show information about connected clients */ if (!(loops % 5)) { redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use", @@ -672,12 +734,13 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } /* Close connections of timedout clients */ - if (!(loops % 10)) + if (server.maxidletime && !(loops % 10)) closeTimedoutClients(); /* Check if a background saving in progress terminated */ if (server.bgsaveinprogress) { int statloc; + /* XXX: TODO handle the case of the saving child killed */ if (wait4(-1,&statloc,WNOHANG,NULL)) { int exitcode = WEXITSTATUS(statloc); if (exitcode == 0) { @@ -690,6 +753,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { "Background saving error"); } server.bgsaveinprogress = 0; + updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR); } } else { /* If there is not a background saving in progress check if @@ -806,6 +870,8 @@ static void initServerConfig() { server.dbfilename = "dump.rdb"; server.requirepass = NULL; server.shareobjects = 0; + server.maxclients = 0; + server.maxmemory = 0; ResetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -858,13 +924,22 @@ static void initServer() { } /* Empty the whole database */ -static void emptyDb() { +static long long emptyDb() { int j; + long long removed = 0; for (j = 0; j < server.dbnum; j++) { + removed += dictSize(server.db[j].dict); dictEmpty(server.db[j].dict); dictEmpty(server.db[j].expires); } + return removed; +} + +static int yesnotoi(char *s) { + if (!strcasecmp(s,"yes")) return 1; + else if (!strcasecmp(s,"no")) return 0; + else return -1; } /* I agree, this is a very rudimental way to load a configuration... @@ -898,44 +973,44 @@ static void loadServerConfig(char *filename) { sdstolower(argv[0]); /* Execute config directives */ - if (!strcmp(argv[0],"timeout") && argc == 2) { + if (!strcasecmp(argv[0],"timeout") && argc == 2) { server.maxidletime = atoi(argv[1]); - if (server.maxidletime < 1) { + if (server.maxidletime < 0) { err = "Invalid timeout value"; goto loaderr; } - } else if (!strcmp(argv[0],"port") && argc == 2) { + } else if (!strcasecmp(argv[0],"port") && argc == 2) { server.port = atoi(argv[1]); if (server.port < 1 || server.port > 65535) { err = "Invalid port"; goto loaderr; } - } else if (!strcmp(argv[0],"bind") && argc == 2) { + } else if (!strcasecmp(argv[0],"bind") && argc == 2) { server.bindaddr = zstrdup(argv[1]); - } else if (!strcmp(argv[0],"save") && argc == 3) { + } else if (!strcasecmp(argv[0],"save") && argc == 3) { int seconds = atoi(argv[1]); int changes = atoi(argv[2]); if (seconds < 1 || changes < 0) { err = "Invalid save parameters"; goto loaderr; } appendServerSaveParams(seconds,changes); - } else if (!strcmp(argv[0],"dir") && argc == 2) { + } else if (!strcasecmp(argv[0],"dir") && argc == 2) { if (chdir(argv[1]) == -1) { redisLog(REDIS_WARNING,"Can't chdir to '%s': %s", argv[1], strerror(errno)); exit(1); } - } else if (!strcmp(argv[0],"loglevel") && argc == 2) { - if (!strcmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG; - else if (!strcmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE; - else if (!strcmp(argv[1],"warning")) server.verbosity = REDIS_WARNING; + } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) { + if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG; + else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE; + else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING; else { err = "Invalid log level. Must be one of debug, notice, warning"; goto loaderr; } - } else if (!strcmp(argv[0],"logfile") && argc == 2) { + } else if (!strcasecmp(argv[0],"logfile") && argc == 2) { FILE *fp; server.logfile = zstrdup(argv[1]); - if (!strcmp(server.logfile,"stdout")) { + if (!strcasecmp(server.logfile,"stdout")) { zfree(server.logfile); server.logfile = NULL; } @@ -950,40 +1025,42 @@ static void loadServerConfig(char *filename) { } fclose(fp); } - } else if (!strcmp(argv[0],"databases") && argc == 2) { + } else if (!strcasecmp(argv[0],"databases") && argc == 2) { server.dbnum = atoi(argv[1]); if (server.dbnum < 1) { err = "Invalid number of databases"; goto loaderr; } - } else if (!strcmp(argv[0],"slaveof") && argc == 3) { + } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) { + server.maxclients = atoi(argv[1]); + } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) { + server.maxmemory = atoi(argv[1]); + } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) { server.masterhost = sdsnew(argv[1]); server.masterport = atoi(argv[2]); server.replstate = REDIS_REPL_CONNECT; - } else if (!strcmp(argv[0],"glueoutputbuf") && argc == 2) { - sdstolower(argv[1]); - if (!strcmp(argv[1],"yes")) server.glueoutputbuf = 1; - else if (!strcmp(argv[1],"no")) server.glueoutputbuf = 0; - else { + } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) { + if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - } else if (!strcmp(argv[0],"shareobjects") && argc == 2) { - sdstolower(argv[1]); - if (!strcmp(argv[1],"yes")) server.shareobjects = 1; - else if (!strcmp(argv[1],"no")) server.shareobjects = 0; - else { + } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) { + if ((server.shareobjects = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - } else if (!strcmp(argv[0],"daemonize") && argc == 2) { - sdstolower(argv[1]); - if (!strcmp(argv[1],"yes")) server.daemonize = 1; - else if (!strcmp(argv[1],"no")) server.daemonize = 0; - else { + } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) { + server.sharingpoolsize = atoi(argv[1]); + if (server.sharingpoolsize < 1) { + err = "invalid object sharing pool size"; goto loaderr; + } + } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) { + if ((server.daemonize = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - } else if (!strcmp(argv[0],"requirepass") && argc == 2) { + } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { server.requirepass = zstrdup(argv[1]); - } else if (!strcmp(argv[0],"pidfile") && argc == 2) { + } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) { server.pidfile = zstrdup(argv[1]); + } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) { + server.dbfilename = zstrdup(argv[1]); } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -1024,6 +1101,8 @@ static void freeClient(redisClient *c) { assert(ln != NULL); listDelNode(server.clients,ln); if (c->flags & REDIS_SLAVE) { + if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) + close(c->repldbfd); list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves; ln = listSearchKey(l,c); assert(ln != NULL); @@ -1033,18 +1112,19 @@ static void freeClient(redisClient *c) { server.master = NULL; server.replstate = REDIS_REPL_CONNECT; } + zfree(c->argv); zfree(c); } static void glueReplyBuffersIfNeeded(redisClient *c) { int totlen = 0; - listNode *ln = c->reply->head, *next; + listNode *ln; robj *o; - while(ln) { + listRewind(c->reply); + while((ln = listYield(c->reply))) { o = ln->value; totlen += sdslen(o->ptr); - ln = ln->next; /* This optimization makes more sense if we don't have to copy * too much data */ if (totlen > 1024) return; @@ -1053,17 +1133,16 @@ static void glueReplyBuffersIfNeeded(redisClient *c) { char buf[1024]; int copylen = 0; - ln = c->reply->head; - while(ln) { - next = ln->next; + listRewind(c->reply); + while((ln = listYield(c->reply))) { o = ln->value; memcpy(buf+copylen,o->ptr,sdslen(o->ptr)); copylen += sdslen(o->ptr); listDelNode(c->reply,ln); - ln = next; } /* Now the output buffer is empty, add the new single element */ - addReplySds(c,sdsnewlen(buf,totlen)); + o = createObject(REDIS_STRING,sdsnewlen(buf,totlen)); + if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail"); } } @@ -1119,7 +1198,7 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) static struct redisCommand *lookupCommand(char *name) { int j = 0; while(cmdTable[j].name != NULL) { - if (!strcmp(name,cmdTable[j].name)) return &cmdTable[j]; + if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j]; j++; } return NULL; @@ -1143,10 +1222,12 @@ static int processCommand(redisClient *c) { struct redisCommand *cmd; long long dirty; - sdstolower(c->argv[0]->ptr); + /* Free some memory if needed (maxmemory setting) */ + if (server.maxmemory) freeMemoryIfNeeded(); + /* The QUIT command is handled as a special case. Normal command * procs are unable to close the client connection safely */ - if (!strcmp(c->argv[0]->ptr,"quit")) { + if (!strcasecmp(c->argv[0]->ptr,"quit")) { freeClient(c); return 0; } @@ -1160,6 +1241,10 @@ static int processCommand(redisClient *c) { addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n")); resetClient(c); return 1; + } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) { + addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n")); + resetClient(c); + return 1; } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { int bulklen = atoi(c->argv[c->argc-1]->ptr); @@ -1214,9 +1299,18 @@ static int processCommand(redisClient *c) { } static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) { - listNode *ln = slaves->head; - robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */ + listNode *ln; int outc = 0, j; + robj **outv; + /* (args*2)+1 is enough room for args, spaces, newlines */ + robj *static_outv[REDIS_STATIC_ARGS*2+1]; + + if (argc <= REDIS_STATIC_ARGS) { + outv = static_outv; + } else { + outv = zmalloc(sizeof(robj*)*(argc*2+1)); + if (!outv) oom("replicationFeedSlaves"); + } for (j = 0; j < argc; j++) { if (j != 0) outv[outc++] = shared.space; @@ -1232,8 +1326,18 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di } outv[outc++] = shared.crlf; - while(ln) { + /* Increment all the refcounts at start and decrement at end in order to + * be sure to free objects if there is no slave in a replication state + * able to be feed with commands */ + for (j = 0; j < outc; j++) incrRefCount(outv[j]); + listRewind(slaves); + while((ln = listYield(slaves))) { redisClient *slave = ln->value; + + /* Don't feed slaves that are still waiting for BGSAVE to start */ + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; + + /* Feed all the other slaves, MONITORs and so on */ if (slave->slaveseldb != dictid) { robj *selectcmd; @@ -1258,18 +1362,19 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di slave->slaveseldb = dictid; } for (j = 0; j < outc; j++) addReply(slave,outv[j]); - ln = ln->next; } + for (j = 0; j < outc; j++) decrRefCount(outv[j]); + if (outv != static_outv) zfree(outv); } static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = (redisClient*) privdata; - char buf[REDIS_QUERYBUF_LEN]; + char buf[REDIS_IOBUF_LEN]; int nread; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); - nread = read(fd, buf, REDIS_QUERYBUF_LEN); + nread = read(fd, buf, REDIS_IOBUF_LEN); if (nread == -1) { if (errno == EAGAIN) { nread = 0; @@ -1317,9 +1422,14 @@ again: return; } argv = sdssplitlen(query,sdslen(query)," ",1,&argc); - sdsfree(query); if (argv == NULL) oom("sdssplitlen"); - for (j = 0; j < argc && j < REDIS_MAX_ARGS; j++) { + sdsfree(query); + + if (c->argv) zfree(c->argv); + c->argv = zmalloc(sizeof(robj*)*argc); + if (c->argv == NULL) oom("allocating arguments list for client"); + + for (j = 0; j < argc; j++) { if (sdslen(argv[j])) { c->argv[c->argc] = createObject(REDIS_STRING,argv[j]); c->argc++; @@ -1333,7 +1443,7 @@ again: * on the query buffer try to process the next command. */ if (processCommand(c) && sdslen(c->querybuf)) goto again; return; - } else if (sdslen(c->querybuf) >= 1024) { + } else if (sdslen(c->querybuf) >= 1024*32) { redisLog(REDIS_DEBUG, "Client protocol error"); freeClient(c); return; @@ -1363,6 +1473,11 @@ static int selectDb(redisClient *c, int id) { return REDIS_OK; } +static void *dupClientReplyValue(void *o) { + incrRefCount((robj*)o); + return 0; +} + static redisClient *createClient(int fd) { redisClient *c = zmalloc(sizeof(*c)); @@ -1373,13 +1488,16 @@ static redisClient *createClient(int fd) { c->fd = fd; c->querybuf = sdsempty(); c->argc = 0; + c->argv = NULL; c->bulklen = -1; c->sentlen = 0; c->flags = 0; c->lastinteraction = time(NULL); c->authenticated = 0; + c->replstate = REDIS_REPL_NONE; if ((c->reply = listCreate()) == NULL) oom("listCreate"); listSetFreeMethod(c->reply,decrRefCount); + listSetDupMethod(c->reply,dupClientReplyValue); if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c, NULL) == AE_ERR) { freeClient(c); @@ -1391,6 +1509,8 @@ static redisClient *createClient(int fd) { static void addReply(redisClient *c, robj *obj) { if (listLength(c->reply) == 0 && + (c->replstate == REDIS_REPL_NONE || + c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c, NULL) == AE_ERR) return; if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail"); @@ -1406,6 +1526,7 @@ static void addReplySds(redisClient *c, sds s) { static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd; char cip[128]; + redisClient *c; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); @@ -1416,11 +1537,23 @@ static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport); - if (createClient(cfd) == NULL) { + if ((c = createClient(cfd)) == NULL) { redisLog(REDIS_WARNING,"Error allocating resoures for the client"); close(cfd); /* May be already closed, just ingore errors */ return; } + /* If maxclient directive is set and this is one client more... close the + * connection. Note that we create the client instead to check before + * for this condition, since now the socket is already set in nonblocking + * mode and we can send an error for free using the Kernel I/O */ + if (server.maxclients && listLength(server.clients) > server.maxclients) { + char *err = "-ERR max number of clients reached\r\n"; + + /* That's a best effort error message, don't check write errors */ + (void) write(c->fd,err,strlen(err)); + freeClient(c); + return; + } server.stat_numconnections++; } @@ -1663,7 +1796,7 @@ static int rdbSaveLzfStringObject(FILE *fp, robj *obj) { /* We require at least four bytes compression for this to be worth it */ outlen = sdslen(obj->ptr)-4; if (outlen <= 0) return 0; - if ((out = zmalloc(outlen)) == NULL) return 0; + if ((out = zmalloc(outlen+1)) == NULL) return 0; comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen); if (comprlen == 0) { zfree(out); @@ -1700,7 +1833,7 @@ static int rdbSaveStringObject(FILE *fp, robj *obj) { /* Try LZF compression - under 20 bytes it's unable to compress even * aaaaaaaaaaaaaaaaaa so skip it */ - if (len > 20) { + if (1 && len > 20) { int retval; retval = rdbSaveLzfStringObject(fp,obj); @@ -1767,14 +1900,14 @@ static int rdbSave(char *filename) { } else if (o->type == REDIS_LIST) { /* Save a list value */ list *list = o->ptr; - listNode *ln = list->head; + listNode *ln; + listRewind(list); if (rdbSaveLen(fp,listLength(list)) == -1) goto werr; - while(ln) { + while((ln = listYield(list))) { robj *eleobj = listNodeValue(ln); if (rdbSaveStringObject(fp,eleobj) == -1) goto werr; - ln = ln->next; } } else if (o->type == REDIS_SET) { /* Save a set value */ @@ -1838,6 +1971,11 @@ static int rdbSaveBackground(char *filename) { } } else { /* Parent */ + if (childpid == -1) { + redisLog(REDIS_WARNING,"Can't save in background: fork: %s", + strerror(errno)); + return REDIS_ERR; + } redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid); server.bgsaveinprogress = 1; return REDIS_OK; @@ -1929,6 +2067,7 @@ static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) { if ((val = sdsnewlen(NULL,len)) == NULL) goto err; if (fread(c,clen,1,fp) == 0) goto err; if (lzf_decompress(c,clen,val,len) == 0) goto err; + zfree(c); return createObject(REDIS_STRING,val); err: zfree(c); @@ -2136,6 +2275,18 @@ static void getCommand(redisClient *c) { } } +static void getSetCommand(redisClient *c) { + getCommand(c); + if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) { + dictReplace(c->db->dict,c->argv[1],c->argv[2]); + } else { + incrRefCount(c->argv[1]); + } + incrRefCount(c->argv[2]); + server.dirty++; + removeExpire(c->db,c->argv[1]); +} + static void mgetCommand(redisClient *c) { int j; @@ -2156,7 +2307,7 @@ static void mgetCommand(redisClient *c) { } } -static void incrDecrCommand(redisClient *c, int incr) { +static void incrDecrCommand(redisClient *c, long long incr) { long long value; int retval; robj *o; @@ -2198,23 +2349,36 @@ static void decrCommand(redisClient *c) { } static void incrbyCommand(redisClient *c) { - int incr = atoi(c->argv[2]->ptr); + long long incr = strtoll(c->argv[2]->ptr, NULL, 10); incrDecrCommand(c,incr); } static void decrbyCommand(redisClient *c) { - int incr = atoi(c->argv[2]->ptr); + long long incr = strtoll(c->argv[2]->ptr, NULL, 10); incrDecrCommand(c,-incr); } /* ========================= Type agnostic commands ========================= */ static void delCommand(redisClient *c) { - if (deleteKey(c->db,c->argv[1])) { - server.dirty++; - addReply(c,shared.cone); - } else { + int deleted = 0, j; + + for (j = 1; j < c->argc; j++) { + if (deleteKey(c->db,c->argv[j])) { + server.dirty++; + deleted++; + } + } + switch(deleted) { + case 0: addReply(c,shared.czero); + break; + case 1: + addReply(c,shared.cone); + break; + default: + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted)); + break; } } @@ -2336,10 +2500,12 @@ static void bgsaveCommand(redisClient *c) { static void shutdownCommand(redisClient *c) { redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); + /* XXX: TODO kill the child if there is a bgsave in progress */ if (rdbSave(server.dbfilename) == REDIS_OK) { if (server.daemonize) { - unlink(server.pidfile); + unlink(server.pidfile); } + redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); redisLog(REDIS_WARNING,"Server exit now, bye bye..."); exit(1); } else { @@ -2767,6 +2933,41 @@ static void sremCommand(redisClient *c) { } } +static void smoveCommand(redisClient *c) { + robj *srcset, *dstset; + + srcset = lookupKeyWrite(c->db,c->argv[1]); + dstset = lookupKeyWrite(c->db,c->argv[2]); + + /* If the source key does not exist return 0, if it's of the wrong type + * raise an error */ + if (srcset == NULL || srcset->type != REDIS_SET) { + addReply(c, srcset ? shared.wrongtypeerr : shared.czero); + return; + } + /* Error if the destination key is not a set as well */ + if (dstset && dstset->type != REDIS_SET) { + addReply(c,shared.wrongtypeerr); + return; + } + /* Remove the element from the source set */ + if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) { + /* Key not found in the src set! return zero */ + addReply(c,shared.czero); + return; + } + server.dirty++; + /* Add the element to the destination set */ + if (!dstset) { + dstset = createSetObject(); + dictAdd(c->db->dict,c->argv[2],dstset); + incrRefCount(c->argv[2]); + } + if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK) + incrRefCount(c->argv[3]); + addReply(c,shared.cone); +} + static void sismemberCommand(redisClient *c) { robj *set; @@ -2817,7 +3018,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r robj *lenobj = NULL, *dstset = NULL; int j, cardinality = 0; - if (!dv) oom("sinterCommand"); + if (!dv) oom("sinterGenericCommand"); for (j = 0; j < setsnum; j++) { robj *setobj; @@ -2826,7 +3027,12 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r lookupKeyRead(c->db,setskeys[j]); if (!setobj) { zfree(dv); - addReply(c,shared.nokeyerr); + if (dstkey) { + deleteKey(c->db,dstkey); + addReply(c,shared.ok); + } else { + addReply(c,shared.nullmultibulk); + } return; } if (setobj->type != REDIS_SET) { @@ -2853,10 +3059,6 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r /* If we have a target key where to store the resulting set * create this key with an empty set inside */ dstset = createSetObject(); - deleteKey(c->db,dstkey); - dictAdd(c->db->dict,dstkey,dstset); - incrRefCount(dstkey); - server.dirty++; } /* Iterate all the elements of the first (smallest) set, and test @@ -2881,15 +3083,24 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r } else { dictAdd(dstset->ptr,ele,NULL); incrRefCount(ele); - server.dirty++; } } dictReleaseIterator(di); - if (!dstkey) + if (dstkey) { + /* Store the resulting set into the target */ + deleteKey(c->db,dstkey); + dictAdd(c->db->dict,dstkey,dstset); + incrRefCount(dstkey); + } + + if (!dstkey) { lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality); - else - addReply(c,shared.ok); + } else { + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n", + dictSize((dict*)dstset->ptr))); + server.dirty++; + } zfree(dv); } @@ -2901,19 +3112,132 @@ static void sinterstoreCommand(redisClient *c) { sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]); } +#define REDIS_OP_UNION 0 +#define REDIS_OP_DIFF 1 + +static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) { + dict **dv = zmalloc(sizeof(dict*)*setsnum); + dictIterator *di; + dictEntry *de; + robj *dstset = NULL; + int j, cardinality = 0; + + if (!dv) oom("sunionDiffGenericCommand"); + for (j = 0; j < setsnum; j++) { + robj *setobj; + + setobj = dstkey ? + lookupKeyWrite(c->db,setskeys[j]) : + lookupKeyRead(c->db,setskeys[j]); + if (!setobj) { + dv[j] = NULL; + continue; + } + if (setobj->type != REDIS_SET) { + zfree(dv); + addReply(c,shared.wrongtypeerr); + return; + } + dv[j] = setobj->ptr; + } + + /* We need a temp set object to store our union. If the dstkey + * is not NULL (that is, we are inside an SUNIONSTORE operation) then + * this set object will be the resulting object to set into the target key*/ + dstset = createSetObject(); + + /* Iterate all the elements of all the sets, add every element a single + * time to the result set */ + for (j = 0; j < setsnum; j++) { + if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */ + if (!dv[j]) continue; /* non existing keys are like empty sets */ + + di = dictGetIterator(dv[j]); + if (!di) oom("dictGetIterator"); + + while((de = dictNext(di)) != NULL) { + robj *ele; + + /* dictAdd will not add the same element multiple times */ + ele = dictGetEntryKey(de); + if (op == REDIS_OP_UNION || j == 0) { + if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) { + incrRefCount(ele); + cardinality++; + } + } else if (op == REDIS_OP_DIFF) { + if (dictDelete(dstset->ptr,ele) == DICT_OK) { + cardinality--; + } + } + } + dictReleaseIterator(di); + + if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */ + } + + /* Output the content of the resulting set, if not in STORE mode */ + if (!dstkey) { + addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality)); + di = dictGetIterator(dstset->ptr); + if (!di) oom("dictGetIterator"); + while((de = dictNext(di)) != NULL) { + robj *ele; + + ele = dictGetEntryKey(de); + addReplySds(c,sdscatprintf(sdsempty(), + "$%d\r\n",sdslen(ele->ptr))); + addReply(c,ele); + addReply(c,shared.crlf); + } + dictReleaseIterator(di); + } else { + /* If we have a target key where to store the resulting set + * create this key with the result set inside */ + deleteKey(c->db,dstkey); + dictAdd(c->db->dict,dstkey,dstset); + incrRefCount(dstkey); + } + + /* Cleanup */ + if (!dstkey) { + decrRefCount(dstset); + } else { + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n", + dictSize((dict*)dstset->ptr))); + server.dirty++; + } + zfree(dv); +} + +static void sunionCommand(redisClient *c) { + sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION); +} + +static void sunionstoreCommand(redisClient *c) { + sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION); +} + +static void sdiffCommand(redisClient *c) { + sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF); +} + +static void sdiffstoreCommand(redisClient *c) { + sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF); +} + static void flushdbCommand(redisClient *c) { + server.dirty += dictSize(c->db->dict); dictEmpty(c->db->dict); dictEmpty(c->db->expires); - server.dirty++; addReply(c,shared.ok); - rdbSave(server.dbfilename); } static void flushallCommand(redisClient *c) { - emptyDb(); - server.dirty++; + server.dirty += emptyDb(); addReply(c,shared.ok); rdbSave(server.dbfilename); + server.dirty++; } redisSortOperation *createSortOperation(int type, robj *pattern) { @@ -3088,13 +3412,14 @@ static void sortCommand(redisClient *c) { j = 0; if (sortval->type == REDIS_LIST) { list *list = sortval->ptr; - listNode *ln = list->head; - while(ln) { + listNode *ln; + + listRewind(list); + while((ln = listYield(list))) { robj *ele = ln->value; vector[j].obj = ele; vector[j].u.score = 0; vector[j].u.cmpobj = NULL; - ln = ln->next; j++; } } else { @@ -3148,7 +3473,10 @@ static void sortCommand(redisClient *c) { server.sort_desc = desc; server.sort_alpha = alpha; server.sort_bypattern = sortby ? 1 : 0; - qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare); + if (sortby && (start != 0 || end != vectorlen-1)) + pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end); + else + qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare); } /* Send command output to the output buffer, performing the specified @@ -3156,14 +3484,15 @@ static void sortCommand(redisClient *c) { outputlen = getop ? getop*(end-start+1) : end-start+1; addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen)); for (j = start; j <= end; j++) { - listNode *ln = operations->head; + listNode *ln; if (!getop) { addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n", sdslen(vector[j].obj->ptr))); addReply(c,vector[j].obj); addReply(c,shared.crlf); } - while(ln) { + listRewind(operations); + while((ln = listYield(operations))) { redisSortOperation *sop = ln->value; robj *val = lookupKeyByPattern(c->db,sop->pattern, vector[j].obj); @@ -3180,7 +3509,6 @@ static void sortCommand(redisClient *c) { } else if (sop->type == REDIS_SORT_DEL) { /* TODO */ } - ln = ln->next; } } @@ -3200,26 +3528,43 @@ static void infoCommand(redisClient *c) { info = sdscatprintf(sdsempty(), "redis_version:%s\r\n" + "uptime_in_seconds:%d\r\n" + "uptime_in_days:%d\r\n" "connected_clients:%d\r\n" "connected_slaves:%d\r\n" "used_memory:%zu\r\n" "changes_since_last_save:%lld\r\n" + "bgsave_in_progress:%d\r\n" "last_save_time:%d\r\n" "total_connections_received:%lld\r\n" "total_commands_processed:%lld\r\n" - "uptime_in_seconds:%d\r\n" - "uptime_in_days:%d\r\n" + "role:%s\r\n" ,REDIS_VERSION, + uptime, + uptime/(3600*24), listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), server.usedmemory, server.dirty, + server.bgsaveinprogress, server.lastsave, server.stat_numconnections, server.stat_numcommands, - uptime, - uptime/(3600*24) + server.masterhost == NULL ? "master" : "slave" ); + if (server.masterhost) { + info = sdscatprintf(info, + "master_host:%s\r\n" + "master_port:%d\r\n" + "master_link_status:%s\r\n" + "master_last_io_seconds_ago:%d\r\n" + ,server.masterhost, + server.masterport, + (server.replstate == REDIS_REPL_CONNECTED) ? + "up" : "down", + (int)(time(NULL)-server.master->lastinteraction) + ); + } addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info))); addReplySds(c,info); addReply(c,shared.crlf); @@ -3317,25 +3662,20 @@ static void expireCommand(redisClient *c) { } } -/* =============================== Replication ============================= */ +static void ttlCommand(redisClient *c) { + time_t expire; + int ttl = -1; -/* Send the whole output buffer syncronously to the slave. This a general operation in theory, but it is actually useful only for replication. */ -static int flushClientOutput(redisClient *c) { - int retval; - time_t start = time(NULL); - - while(listLength(c->reply)) { - if (time(NULL)-start > 5) return REDIS_ERR; /* 5 seconds timeout */ - retval = aeWait(c->fd,AE_WRITABLE,1000); - if (retval == -1) { - return REDIS_ERR; - } else if (retval & AE_WRITABLE) { - sendReplyToClient(NULL, c->fd, c, AE_WRITABLE); - } + expire = getExpire(c->db,c->argv[1]); + if (expire != -1) { + ttl = (int) (expire-time(NULL)); + if (ttl < 0) ttl = -1; } - return REDIS_OK; + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl)); } +/* =============================== Replication ============================= */ + static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) { ssize_t nwritten, ret = size; time_t start = time(NULL); @@ -3399,48 +3739,165 @@ static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) { } static void syncCommand(redisClient *c) { - struct stat sb; - int fd = -1, len; - time_t start = time(NULL); - char sizebuf[32]; - /* ignore SYNC if aleady slave or in monitor mode */ if (c->flags & REDIS_SLAVE) return; - redisLog(REDIS_NOTICE,"Slave ask for syncronization"); - if (flushClientOutput(c) == REDIS_ERR || - rdbSave(server.dbfilename) != REDIS_OK) - goto closeconn; - - fd = open(server.dbfilename, O_RDONLY); - if (fd == -1 || fstat(fd,&sb) == -1) goto closeconn; - len = sb.st_size; - - snprintf(sizebuf,32,"$%d\r\n",len); - if (syncWrite(c->fd,sizebuf,strlen(sizebuf),5) == -1) goto closeconn; - while(len) { - char buf[1024]; - int nread; + /* SYNC can't be issued when the server has pending data to send to + * the client about already issued commands. We need a fresh reply + * buffer registering the differences between the BGSAVE and the current + * dataset, so that we can copy to other slaves if needed. */ + if (listLength(c->reply) != 0) { + addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n")); + return; + } - if (time(NULL)-start > REDIS_MAX_SYNC_TIME) goto closeconn; - nread = read(fd,buf,1024); - if (nread == -1) goto closeconn; - len -= nread; - if (syncWrite(c->fd,buf,nread,5) == -1) goto closeconn; + redisLog(REDIS_NOTICE,"Slave ask for synchronization"); + /* Here we need to check if there is a background saving operation + * in progress, or if it is required to start one */ + if (server.bgsaveinprogress) { + /* Ok a background save is in progress. Let's check if it is a good + * one for replication, i.e. if there is another slave that is + * registering differences since the server forked to save */ + redisClient *slave; + listNode *ln; + + listRewind(server.slaves); + while((ln = listYield(server.slaves))) { + slave = ln->value; + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; + } + if (ln) { + /* Perfect, the server is already registering differences for + * another slave. Set the right state, and copy the buffer. */ + listRelease(c->reply); + c->reply = listDup(slave->reply); + if (!c->reply) oom("listDup copying slave reply list"); + c->replstate = REDIS_REPL_WAIT_BGSAVE_END; + redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); + } else { + /* No way, we need to wait for the next BGSAVE in order to + * register differences */ + c->replstate = REDIS_REPL_WAIT_BGSAVE_START; + redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); + } + } else { + /* Ok we don't have a BGSAVE in progress, let's start one */ + redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); + if (rdbSaveBackground(server.dbfilename) != REDIS_OK) { + redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); + addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n")); + return; + } + c->replstate = REDIS_REPL_WAIT_BGSAVE_END; } - if (syncWrite(c->fd,"\r\n",2,5) == -1) goto closeconn; - close(fd); + c->repldbfd = -1; c->flags |= REDIS_SLAVE; c->slaveseldb = 0; if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail"); - redisLog(REDIS_NOTICE,"Syncronization with slave succeeded"); return; +} -closeconn: - if (fd != -1) close(fd); - c->flags |= REDIS_CLOSE; - redisLog(REDIS_WARNING,"Syncronization with slave failed"); - return; +static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { + redisClient *slave = privdata; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + char buf[REDIS_IOBUF_LEN]; + ssize_t nwritten, buflen; + + if (slave->repldboff == 0) { + /* Write the bulk write count before to transfer the DB. In theory here + * we don't know how much room there is in the output buffer of the + * socket, but in pratice SO_SNDLOWAT (the minimum count for output + * operations) will never be smaller than the few bytes we need. */ + sds bulkcount; + + bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) + slave->repldbsize); + if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount)) + { + sdsfree(bulkcount); + freeClient(slave); + return; + } + sdsfree(bulkcount); + } + lseek(slave->repldbfd,slave->repldboff,SEEK_SET); + buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); + if (buflen <= 0) { + redisLog(REDIS_WARNING,"Read error sending DB to slave: %s", + (buflen == 0) ? "premature EOF" : strerror(errno)); + freeClient(slave); + return; + } + if ((nwritten = write(fd,buf,buflen)) == -1) { + redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s", + strerror(errno)); + freeClient(slave); + return; + } + slave->repldboff += nwritten; + if (slave->repldboff == slave->repldbsize) { + close(slave->repldbfd); + slave->repldbfd = -1; + aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); + slave->replstate = REDIS_REPL_ONLINE; + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, + sendReplyToClient, slave, NULL) == AE_ERR) { + freeClient(slave); + return; + } + addReplySds(slave,sdsempty()); + redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); + } +} + +static void updateSalvesWaitingBgsave(int bgsaveerr) { + listNode *ln; + int startbgsave = 0; + + listRewind(server.slaves); + while((ln = listYield(server.slaves))) { + redisClient *slave = ln->value; + + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { + startbgsave = 1; + slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; + } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { + struct stat buf; + + if (bgsaveerr != REDIS_OK) { + freeClient(slave); + redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); + continue; + } + if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 || + fstat(slave->repldbfd,&buf) == -1) { + freeClient(slave); + redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); + continue; + } + slave->repldboff = 0; + slave->repldbsize = buf.st_size; + slave->replstate = REDIS_REPL_SEND_BULK; + aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) { + freeClient(slave); + continue; + } + } + } + if (startbgsave) { + if (rdbSaveBackground(server.dbfilename) != REDIS_OK) { + listRewind(server.slaves); + redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed"); + while((ln = listYield(server.slaves))) { + redisClient *slave = ln->value; + + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) + freeClient(slave); + } + } + } } static int syncWithMaster(void) { @@ -3462,7 +3919,7 @@ static int syncWithMaster(void) { return REDIS_ERR; } /* Read the bulk write count */ - if (syncReadLine(fd,buf,1024,5) == -1) { + if (syncReadLine(fd,buf,1024,3600) == -1) { close(fd); redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s", strerror(errno)); @@ -3517,8 +3974,104 @@ static int syncWithMaster(void) { return REDIS_OK; } +static void slaveofCommand(redisClient *c) { + if (!strcasecmp(c->argv[1]->ptr,"no") && + !strcasecmp(c->argv[2]->ptr,"one")) { + if (server.masterhost) { + sdsfree(server.masterhost); + server.masterhost = NULL; + if (server.master) freeClient(server.master); + server.replstate = REDIS_REPL_NONE; + redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)"); + } + } else { + sdsfree(server.masterhost); + server.masterhost = sdsdup(c->argv[1]->ptr); + server.masterport = atoi(c->argv[2]->ptr); + if (server.master) freeClient(server.master); + server.replstate = REDIS_REPL_CONNECT; + redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", + server.masterhost, server.masterport); + } + addReply(c,shared.ok); +} + +/* ============================ Maxmemory directive ======================== */ + +/* This function gets called when 'maxmemory' is set on the config file to limit + * the max memory used by the server, and we are out of memory. + * This function will try to, in order: + * + * - Free objects from the free list + * - Try to remove keys with an EXPIRE set + * + * It is not possible to free enough memory to reach used-memory < maxmemory + * the server will start refusing commands that will enlarge even more the + * memory usage. + */ +static void freeMemoryIfNeeded(void) { + while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) { + if (listLength(server.objfreelist)) { + robj *o; + + listNode *head = listFirst(server.objfreelist); + o = listNodeValue(head); + listDelNode(server.objfreelist,head); + zfree(o); + } else { + int j, k, freed = 0; + + for (j = 0; j < server.dbnum; j++) { + int minttl = -1; + robj *minkey = NULL; + struct dictEntry *de; + + if (dictSize(server.db[j].expires)) { + freed = 1; + /* From a sample of three keys drop the one nearest to + * the natural expire */ + for (k = 0; k < 3; k++) { + time_t t; + + de = dictGetRandomKey(server.db[j].expires); + t = (time_t) dictGetEntryVal(de); + if (minttl == -1 || t < minttl) { + minkey = dictGetEntryKey(de); + minttl = t; + } + } + deleteKey(server.db+j,minkey); + } + } + if (!freed) return; /* nothing to free... */ + } + } +} + /* =================================== Main! ================================ */ +#ifdef __linux__ +int linuxOvercommitMemoryValue(void) { + FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r"); + char buf[64]; + + if (!fp) return -1; + if (fgets(buf,64,fp) == NULL) { + fclose(fp); + return -1; + } + fclose(fp); + + return atoi(buf); +} + +void linuxOvercommitMemoryWarning(void) { + if (linuxOvercommitMemoryValue() == 0) { + redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts."); + } +} +#endif /* __linux__ */ + static void daemonize(void) { int fd; FILE *fp; @@ -3544,6 +4097,10 @@ static void daemonize(void) { } int main(int argc, char **argv) { +#ifdef __linux__ + linuxOvercommitMemoryWarning(); +#endif + initServerConfig(); if (argc == 2) { ResetServerSaveParams(); @@ -3551,6 +4108,8 @@ int main(int argc, char **argv) { } else if (argc > 2) { fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n"); exit(1); + } else { + redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'"); } initServer(); if (server.daemonize) daemonize();