X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/40d224a9e3df29e27e967ec2f1b9b0ecf66df50f..b5ff27084f593f64d47337c5f47ea3bf76c63a5a:/redis.c?ds=sidebyside diff --git a/redis.c b/redis.c index 31debcde..a8b5e2f2 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "0.091" +#define REDIS_VERSION "0.101" #include "fmacros.h" @@ -49,6 +49,7 @@ #include #include #include +#include #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ @@ -56,7 +57,8 @@ #include "dict.h" /* Hash tables */ #include "adlist.h" /* Linked lists */ #include "zmalloc.h" /* total memory usage aware version of malloc/free */ -#include "lzf.h" +#include "lzf.h" /* LZF compression library */ +#include "pqsort.h" /* Partial qsort for SORT+LIMIT */ /* Error codes */ #define REDIS_OK 0 @@ -65,9 +67,9 @@ /* Static server configuration */ #define REDIS_SERVERPORT 6379 /* TCP port */ #define REDIS_MAXIDLETIME (60*5) /* default client timeout */ -#define REDIS_QUERYBUF_LEN 1024 +#define REDIS_IOBUF_LEN 1024 #define REDIS_LOADBUF_LEN 1024 -#define REDIS_MAX_ARGS 16 +#define REDIS_STATIC_ARGS 4 #define REDIS_DEFAULT_DBNUM 16 #define REDIS_CONFIGLINE_MAX 1024 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */ @@ -79,8 +81,13 @@ #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */ /* Command flags */ -#define REDIS_CMD_BULK 1 -#define REDIS_CMD_INLINE 2 +#define REDIS_CMD_BULK 1 /* Bulk write command */ +#define REDIS_CMD_INLINE 2 /* Inline command */ +/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with + this flags will return an error when the 'maxmemory' option is set in the + config file and the server is using more than maxmemory bytes of memory. + In short this commands are denied on low memory conditions. */ +#define REDIS_CMD_DENYOOM 4 /* Object types */ #define REDIS_STRING 0 @@ -183,7 +190,7 @@ typedef struct redisClient { redisDb *db; int dictid; sds querybuf; - robj *argv[REDIS_MAX_ARGS]; + robj **argv; int argc; int bulklen; /* bulk read len. -1 if not in bulk read mode */ list *reply; @@ -194,7 +201,7 @@ typedef struct redisClient { int authenticated; /* when requirepass is non-NULL */ int replstate; /* replication state if this is a slave */ int repldbfd; /* replication DB file descriptor */ - int repldboff; /* replication DB file offset */ + long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ } redisClient; @@ -244,6 +251,8 @@ struct redisServer { int masterport; redisClient *master; /* client that is master for this slave */ int replstate; + unsigned int maxclients; + unsigned int maxmemory; /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -304,6 +313,8 @@ static int deleteIfVolatile(redisDb *db, robj *key); static int deleteKey(redisDb *db, robj *key); static time_t getExpire(redisDb *db, robj *key); static int setExpire(redisDb *db, robj *key, time_t when); +static void updateSalvesWaitingBgsave(int bgsaveerr); +static void freeMemoryIfNeeded(void); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); @@ -340,12 +351,15 @@ static void typeCommand(redisClient *c); static void lsetCommand(redisClient *c); static void saddCommand(redisClient *c); static void sremCommand(redisClient *c); +static void smoveCommand(redisClient *c); static void sismemberCommand(redisClient *c); static void scardCommand(redisClient *c); static void sinterCommand(redisClient *c); static void sinterstoreCommand(redisClient *c); static void sunionCommand(redisClient *c); static void sunionstoreCommand(redisClient *c); +static void sdiffCommand(redisClient *c); +static void sdiffstoreCommand(redisClient *c); static void syncCommand(redisClient *c); static void flushdbCommand(redisClient *c); static void flushallCommand(redisClient *c); @@ -355,6 +369,10 @@ static void infoCommand(redisClient *c); static void mgetCommand(redisClient *c); static void monitorCommand(redisClient *c); static void expireCommand(redisClient *c); +static void getSetCommand(redisClient *c); +static void ttlCommand(redisClient *c); +static void slaveofCommand(redisClient *c); +static void debugCommand(redisClient *c); /*================================= Globals ================================= */ @@ -362,39 +380,44 @@ static void expireCommand(redisClient *c); static struct redisServer server; /* server global state */ static struct redisCommand cmdTable[] = { {"get",getCommand,2,REDIS_CMD_INLINE}, - {"set",setCommand,3,REDIS_CMD_BULK}, - {"setnx",setnxCommand,3,REDIS_CMD_BULK}, - {"del",delCommand,2,REDIS_CMD_INLINE}, + {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, + {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, + {"del",delCommand,-2,REDIS_CMD_INLINE}, {"exists",existsCommand,2,REDIS_CMD_INLINE}, - {"incr",incrCommand,2,REDIS_CMD_INLINE}, - {"decr",decrCommand,2,REDIS_CMD_INLINE}, + {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, {"mget",mgetCommand,-2,REDIS_CMD_INLINE}, - {"rpush",rpushCommand,3,REDIS_CMD_BULK}, - {"lpush",lpushCommand,3,REDIS_CMD_BULK}, + {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, + {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"rpop",rpopCommand,2,REDIS_CMD_INLINE}, {"lpop",lpopCommand,2,REDIS_CMD_INLINE}, {"llen",llenCommand,2,REDIS_CMD_INLINE}, {"lindex",lindexCommand,3,REDIS_CMD_INLINE}, - {"lset",lsetCommand,4,REDIS_CMD_BULK}, + {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"lrange",lrangeCommand,4,REDIS_CMD_INLINE}, {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE}, {"lrem",lremCommand,4,REDIS_CMD_BULK}, - {"sadd",saddCommand,3,REDIS_CMD_BULK}, + {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"srem",sremCommand,3,REDIS_CMD_BULK}, + {"smove",smoveCommand,4,REDIS_CMD_BULK}, {"sismember",sismemberCommand,3,REDIS_CMD_BULK}, {"scard",scardCommand,2,REDIS_CMD_INLINE}, - {"sinter",sinterCommand,-2,REDIS_CMD_INLINE}, - {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE}, - {"sunion",sunionCommand,-2,REDIS_CMD_INLINE}, - {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE}, + {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, {"smembers",sinterCommand,2,REDIS_CMD_INLINE}, - {"incrby",incrbyCommand,3,REDIS_CMD_INLINE}, - {"decrby",decrbyCommand,3,REDIS_CMD_INLINE}, + {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, + {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE}, {"select",selectCommand,2,REDIS_CMD_INLINE}, {"move",moveCommand,3,REDIS_CMD_INLINE}, {"rename",renameCommand,3,REDIS_CMD_INLINE}, {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE}, + {"expire",expireCommand,3,REDIS_CMD_INLINE}, {"keys",keysCommand,2,REDIS_CMD_INLINE}, {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE}, {"auth",authCommand,2,REDIS_CMD_INLINE}, @@ -408,10 +431,12 @@ static struct redisCommand cmdTable[] = { {"sync",syncCommand,1,REDIS_CMD_INLINE}, {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE}, {"flushall",flushallCommand,1,REDIS_CMD_INLINE}, - {"sort",sortCommand,-2,REDIS_CMD_INLINE}, + {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, {"info",infoCommand,1,REDIS_CMD_INLINE}, {"monitor",monitorCommand,1,REDIS_CMD_INLINE}, - {"expire",expireCommand,3,REDIS_CMD_INLINE}, + {"ttl",ttlCommand,2,REDIS_CMD_INLINE}, + {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE}, + {"debug",debugCommand,-2,REDIS_CMD_INLINE}, {NULL,NULL,0,0} }; @@ -551,7 +576,12 @@ void redisLog(int level, const char *fmt, ...) va_start(ap, fmt); if (level >= server.verbosity) { char *c = ".-*"; - fprintf(fp,"%c ",c[level]); + char buf[64]; + time_t now; + + now = time(NULL); + strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now)); + fprintf(fp,"%s %c ",buf,c[level]); vfprintf(fp, fmt, ap); fprintf(fp,"\n"); fflush(fp); @@ -633,21 +663,38 @@ static void oom(const char *msg) { /* ====================== Redis server networking stuff ===================== */ void closeTimedoutClients(void) { redisClient *c; - listIter *li; listNode *ln; time_t now = time(NULL); - li = listGetIterator(server.clients,AL_START_HEAD); - if (!li) return; - while ((ln = listNextElement(li)) != NULL) { + listRewind(server.clients); + while ((ln = listYield(server.clients)) != NULL) { c = listNodeValue(ln); if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ + !(c->flags & REDIS_MASTER) && /* no timeout for masters */ (now - c->lastinteraction > server.maxidletime)) { redisLog(REDIS_DEBUG,"Closing idle client"); freeClient(c); } } - listReleaseIterator(li); +} + +/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL + * we resize the hash table to save memory */ +void tryResizeHashTables(void) { + int j; + + for (j = 0; j < server.dbnum; j++) { + long long size, used; + + size = dictSlots(server.db[j].dict); + used = dictSize(server.db[j].dict); + if (size && used && size > REDIS_HT_MINSLOTS && + (used*100/size < REDIS_HT_MINFILL)) { + redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j); + dictResize(server.db[j].dict); + redisLog(REDIS_NOTICE,"Hash table %d resized.",j); + } + } } int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { @@ -659,10 +706,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Update the global state with the amount of used memory */ server.usedmemory = zmalloc_used_memory(); - /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL - * we resize the hash table to save memory */ + /* Show some info about non-empty databases */ for (j = 0; j < server.dbnum; j++) { - int size, used, vkeys; + long long size, used, vkeys; size = dictSlots(server.db[j].dict); used = dictSize(server.db[j].dict); @@ -671,14 +717,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size); /* dictPrintStats(server.dict); */ } - if (size && used && size > REDIS_HT_MINSLOTS && - (used*100/size < REDIS_HT_MINFILL)) { - redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j); - dictResize(server.db[j].dict); - redisLog(REDIS_NOTICE,"Hash table %d resized.",j); - } } + /* We don't want to resize the hash tables while a bacground saving + * is in progress: the saving child is created using fork() that is + * implemented with a copy-on-write semantic in most modern systems, so + * if we resize the HT while there is the saving child at work actually + * a lot of memory movements in the parent will cause a lot of pages + * copied. */ + if (!server.bgsaveinprogress) tryResizeHashTables(); + /* Show information about connected clients */ if (!(loops % 5)) { redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use", @@ -689,12 +737,13 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } /* Close connections of timedout clients */ - if (!(loops % 10)) + if (server.maxidletime && !(loops % 10)) closeTimedoutClients(); /* Check if a background saving in progress terminated */ if (server.bgsaveinprogress) { int statloc; + /* XXX: TODO handle the case of the saving child killed */ if (wait4(-1,&statloc,WNOHANG,NULL)) { int exitcode = WEXITSTATUS(statloc); if (exitcode == 0) { @@ -707,6 +756,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { "Background saving error"); } server.bgsaveinprogress = 0; + updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR); } } else { /* If there is not a background saving in progress check if @@ -823,6 +873,8 @@ static void initServerConfig() { server.dbfilename = "dump.rdb"; server.requirepass = NULL; server.shareobjects = 0; + server.maxclients = 0; + server.maxmemory = 0; ResetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -875,13 +927,22 @@ static void initServer() { } /* Empty the whole database */ -static void emptyDb() { +static long long emptyDb() { int j; + long long removed = 0; for (j = 0; j < server.dbnum; j++) { + removed += dictSize(server.db[j].dict); dictEmpty(server.db[j].dict); dictEmpty(server.db[j].expires); } + return removed; +} + +static int yesnotoi(char *s) { + if (!strcasecmp(s,"yes")) return 1; + else if (!strcasecmp(s,"no")) return 0; + else return -1; } /* I agree, this is a very rudimental way to load a configuration... @@ -915,44 +976,44 @@ static void loadServerConfig(char *filename) { sdstolower(argv[0]); /* Execute config directives */ - if (!strcmp(argv[0],"timeout") && argc == 2) { + if (!strcasecmp(argv[0],"timeout") && argc == 2) { server.maxidletime = atoi(argv[1]); - if (server.maxidletime < 1) { + if (server.maxidletime < 0) { err = "Invalid timeout value"; goto loaderr; } - } else if (!strcmp(argv[0],"port") && argc == 2) { + } else if (!strcasecmp(argv[0],"port") && argc == 2) { server.port = atoi(argv[1]); if (server.port < 1 || server.port > 65535) { err = "Invalid port"; goto loaderr; } - } else if (!strcmp(argv[0],"bind") && argc == 2) { + } else if (!strcasecmp(argv[0],"bind") && argc == 2) { server.bindaddr = zstrdup(argv[1]); - } else if (!strcmp(argv[0],"save") && argc == 3) { + } else if (!strcasecmp(argv[0],"save") && argc == 3) { int seconds = atoi(argv[1]); int changes = atoi(argv[2]); if (seconds < 1 || changes < 0) { err = "Invalid save parameters"; goto loaderr; } appendServerSaveParams(seconds,changes); - } else if (!strcmp(argv[0],"dir") && argc == 2) { + } else if (!strcasecmp(argv[0],"dir") && argc == 2) { if (chdir(argv[1]) == -1) { redisLog(REDIS_WARNING,"Can't chdir to '%s': %s", argv[1], strerror(errno)); exit(1); } - } else if (!strcmp(argv[0],"loglevel") && argc == 2) { - if (!strcmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG; - else if (!strcmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE; - else if (!strcmp(argv[1],"warning")) server.verbosity = REDIS_WARNING; + } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) { + if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG; + else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE; + else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING; else { err = "Invalid log level. Must be one of debug, notice, warning"; goto loaderr; } - } else if (!strcmp(argv[0],"logfile") && argc == 2) { + } else if (!strcasecmp(argv[0],"logfile") && argc == 2) { FILE *fp; server.logfile = zstrdup(argv[1]); - if (!strcmp(server.logfile,"stdout")) { + if (!strcasecmp(server.logfile,"stdout")) { zfree(server.logfile); server.logfile = NULL; } @@ -967,40 +1028,42 @@ static void loadServerConfig(char *filename) { } fclose(fp); } - } else if (!strcmp(argv[0],"databases") && argc == 2) { + } else if (!strcasecmp(argv[0],"databases") && argc == 2) { server.dbnum = atoi(argv[1]); if (server.dbnum < 1) { err = "Invalid number of databases"; goto loaderr; } - } else if (!strcmp(argv[0],"slaveof") && argc == 3) { + } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) { + server.maxclients = atoi(argv[1]); + } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) { + server.maxmemory = atoi(argv[1]); + } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) { server.masterhost = sdsnew(argv[1]); server.masterport = atoi(argv[2]); server.replstate = REDIS_REPL_CONNECT; - } else if (!strcmp(argv[0],"glueoutputbuf") && argc == 2) { - sdstolower(argv[1]); - if (!strcmp(argv[1],"yes")) server.glueoutputbuf = 1; - else if (!strcmp(argv[1],"no")) server.glueoutputbuf = 0; - else { + } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) { + if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - } else if (!strcmp(argv[0],"shareobjects") && argc == 2) { - sdstolower(argv[1]); - if (!strcmp(argv[1],"yes")) server.shareobjects = 1; - else if (!strcmp(argv[1],"no")) server.shareobjects = 0; - else { + } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) { + if ((server.shareobjects = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - } else if (!strcmp(argv[0],"daemonize") && argc == 2) { - sdstolower(argv[1]); - if (!strcmp(argv[1],"yes")) server.daemonize = 1; - else if (!strcmp(argv[1],"no")) server.daemonize = 0; - else { + } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) { + server.sharingpoolsize = atoi(argv[1]); + if (server.sharingpoolsize < 1) { + err = "invalid object sharing pool size"; goto loaderr; + } + } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) { + if ((server.daemonize = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - } else if (!strcmp(argv[0],"requirepass") && argc == 2) { + } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { server.requirepass = zstrdup(argv[1]); - } else if (!strcmp(argv[0],"pidfile") && argc == 2) { + } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) { server.pidfile = zstrdup(argv[1]); + } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) { + server.dbfilename = zstrdup(argv[1]); } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -1041,6 +1104,8 @@ static void freeClient(redisClient *c) { assert(ln != NULL); listDelNode(server.clients,ln); if (c->flags & REDIS_SLAVE) { + if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) + close(c->repldbfd); list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves; ln = listSearchKey(l,c); assert(ln != NULL); @@ -1050,18 +1115,19 @@ static void freeClient(redisClient *c) { server.master = NULL; server.replstate = REDIS_REPL_CONNECT; } + zfree(c->argv); zfree(c); } static void glueReplyBuffersIfNeeded(redisClient *c) { int totlen = 0; - listNode *ln = c->reply->head, *next; + listNode *ln; robj *o; - while(ln) { + listRewind(c->reply); + while((ln = listYield(c->reply))) { o = ln->value; totlen += sdslen(o->ptr); - ln = ln->next; /* This optimization makes more sense if we don't have to copy * too much data */ if (totlen > 1024) return; @@ -1070,17 +1136,16 @@ static void glueReplyBuffersIfNeeded(redisClient *c) { char buf[1024]; int copylen = 0; - ln = c->reply->head; - while(ln) { - next = ln->next; + listRewind(c->reply); + while((ln = listYield(c->reply))) { o = ln->value; memcpy(buf+copylen,o->ptr,sdslen(o->ptr)); copylen += sdslen(o->ptr); listDelNode(c->reply,ln); - ln = next; } /* Now the output buffer is empty, add the new single element */ - addReplySds(c,sdsnewlen(buf,totlen)); + o = createObject(REDIS_STRING,sdsnewlen(buf,totlen)); + if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail"); } } @@ -1136,7 +1201,7 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) static struct redisCommand *lookupCommand(char *name) { int j = 0; while(cmdTable[j].name != NULL) { - if (!strcmp(name,cmdTable[j].name)) return &cmdTable[j]; + if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j]; j++; } return NULL; @@ -1160,10 +1225,12 @@ static int processCommand(redisClient *c) { struct redisCommand *cmd; long long dirty; - sdstolower(c->argv[0]->ptr); + /* Free some memory if needed (maxmemory setting) */ + if (server.maxmemory) freeMemoryIfNeeded(); + /* The QUIT command is handled as a special case. Normal command * procs are unable to close the client connection safely */ - if (!strcmp(c->argv[0]->ptr,"quit")) { + if (!strcasecmp(c->argv[0]->ptr,"quit")) { freeClient(c); return 0; } @@ -1177,6 +1244,10 @@ static int processCommand(redisClient *c) { addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n")); resetClient(c); return 1; + } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) { + addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n")); + resetClient(c); + return 1; } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { int bulklen = atoi(c->argv[c->argc-1]->ptr); @@ -1231,9 +1302,18 @@ static int processCommand(redisClient *c) { } static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) { - listNode *ln = slaves->head; - robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */ + listNode *ln; int outc = 0, j; + robj **outv; + /* (args*2)+1 is enough room for args, spaces, newlines */ + robj *static_outv[REDIS_STATIC_ARGS*2+1]; + + if (argc <= REDIS_STATIC_ARGS) { + outv = static_outv; + } else { + outv = zmalloc(sizeof(robj*)*(argc*2+1)); + if (!outv) oom("replicationFeedSlaves"); + } for (j = 0; j < argc; j++) { if (j != 0) outv[outc++] = shared.space; @@ -1253,14 +1333,12 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di * be sure to free objects if there is no slave in a replication state * able to be feed with commands */ for (j = 0; j < outc; j++) incrRefCount(outv[j]); - while(ln) { + listRewind(slaves); + while((ln = listYield(slaves))) { redisClient *slave = ln->value; /* Don't feed slaves that are still waiting for BGSAVE to start */ - if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { - ln = ln->next; - continue; - } + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; /* Feed all the other slaves, MONITORs and so on */ if (slave->slaveseldb != dictid) { @@ -1287,19 +1365,19 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di slave->slaveseldb = dictid; } for (j = 0; j < outc; j++) addReply(slave,outv[j]); - ln = ln->next; } for (j = 0; j < outc; j++) decrRefCount(outv[j]); + if (outv != static_outv) zfree(outv); } static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = (redisClient*) privdata; - char buf[REDIS_QUERYBUF_LEN]; + char buf[REDIS_IOBUF_LEN]; int nread; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); - nread = read(fd, buf, REDIS_QUERYBUF_LEN); + nread = read(fd, buf, REDIS_IOBUF_LEN); if (nread == -1) { if (errno == EAGAIN) { nread = 0; @@ -1347,9 +1425,14 @@ again: return; } argv = sdssplitlen(query,sdslen(query)," ",1,&argc); - sdsfree(query); if (argv == NULL) oom("sdssplitlen"); - for (j = 0; j < argc && j < REDIS_MAX_ARGS; j++) { + sdsfree(query); + + if (c->argv) zfree(c->argv); + c->argv = zmalloc(sizeof(robj*)*argc); + if (c->argv == NULL) oom("allocating arguments list for client"); + + for (j = 0; j < argc; j++) { if (sdslen(argv[j])) { c->argv[c->argc] = createObject(REDIS_STRING,argv[j]); c->argc++; @@ -1363,7 +1446,7 @@ again: * on the query buffer try to process the next command. */ if (processCommand(c) && sdslen(c->querybuf)) goto again; return; - } else if (sdslen(c->querybuf) >= 1024) { + } else if (sdslen(c->querybuf) >= 1024*32) { redisLog(REDIS_DEBUG, "Client protocol error"); freeClient(c); return; @@ -1408,6 +1491,7 @@ static redisClient *createClient(int fd) { c->fd = fd; c->querybuf = sdsempty(); c->argc = 0; + c->argv = NULL; c->bulklen = -1; c->sentlen = 0; c->flags = 0; @@ -1428,6 +1512,8 @@ static redisClient *createClient(int fd) { static void addReply(redisClient *c, robj *obj) { if (listLength(c->reply) == 0 && + (c->replstate == REDIS_REPL_NONE || + c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c, NULL) == AE_ERR) return; if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail"); @@ -1443,6 +1529,7 @@ static void addReplySds(redisClient *c, sds s) { static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd; char cip[128]; + redisClient *c; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); @@ -1453,11 +1540,23 @@ static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport); - if (createClient(cfd) == NULL) { + if ((c = createClient(cfd)) == NULL) { redisLog(REDIS_WARNING,"Error allocating resoures for the client"); close(cfd); /* May be already closed, just ingore errors */ return; } + /* If maxclient directive is set and this is one client more... close the + * connection. Note that we create the client instead to check before + * for this condition, since now the socket is already set in nonblocking + * mode and we can send an error for free using the Kernel I/O */ + if (server.maxclients && listLength(server.clients) > server.maxclients) { + char *err = "-ERR max number of clients reached\r\n"; + + /* That's a best effort error message, don't check write errors */ + (void) write(c->fd,err,strlen(err)); + freeClient(c); + return; + } server.stat_numconnections++; } @@ -1700,7 +1799,7 @@ static int rdbSaveLzfStringObject(FILE *fp, robj *obj) { /* We require at least four bytes compression for this to be worth it */ outlen = sdslen(obj->ptr)-4; if (outlen <= 0) return 0; - if ((out = zmalloc(outlen)) == NULL) return 0; + if ((out = zmalloc(outlen+1)) == NULL) return 0; comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen); if (comprlen == 0) { zfree(out); @@ -1737,7 +1836,7 @@ static int rdbSaveStringObject(FILE *fp, robj *obj) { /* Try LZF compression - under 20 bytes it's unable to compress even * aaaaaaaaaaaaaaaaaa so skip it */ - if (len > 20) { + if (1 && len > 20) { int retval; retval = rdbSaveLzfStringObject(fp,obj); @@ -1804,14 +1903,14 @@ static int rdbSave(char *filename) { } else if (o->type == REDIS_LIST) { /* Save a list value */ list *list = o->ptr; - listNode *ln = list->head; + listNode *ln; + listRewind(list); if (rdbSaveLen(fp,listLength(list)) == -1) goto werr; - while(ln) { + while((ln = listYield(list))) { robj *eleobj = listNodeValue(ln); if (rdbSaveStringObject(fp,eleobj) == -1) goto werr; - ln = ln->next; } } else if (o->type == REDIS_SET) { /* Save a set value */ @@ -1875,6 +1974,11 @@ static int rdbSaveBackground(char *filename) { } } else { /* Parent */ + if (childpid == -1) { + redisLog(REDIS_WARNING,"Can't save in background: fork: %s", + strerror(errno)); + return REDIS_ERR; + } redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid); server.bgsaveinprogress = 1; return REDIS_OK; @@ -1966,6 +2070,7 @@ static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) { if ((val = sdsnewlen(NULL,len)) == NULL) goto err; if (fread(c,clen,1,fp) == 0) goto err; if (lzf_decompress(c,clen,val,len) == 0) goto err; + zfree(c); return createObject(REDIS_STRING,val); err: zfree(c); @@ -2173,6 +2278,18 @@ static void getCommand(redisClient *c) { } } +static void getSetCommand(redisClient *c) { + getCommand(c); + if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) { + dictReplace(c->db->dict,c->argv[1],c->argv[2]); + } else { + incrRefCount(c->argv[1]); + } + incrRefCount(c->argv[2]); + server.dirty++; + removeExpire(c->db,c->argv[1]); +} + static void mgetCommand(redisClient *c) { int j; @@ -2193,7 +2310,7 @@ static void mgetCommand(redisClient *c) { } } -static void incrDecrCommand(redisClient *c, int incr) { +static void incrDecrCommand(redisClient *c, long long incr) { long long value; int retval; robj *o; @@ -2235,23 +2352,36 @@ static void decrCommand(redisClient *c) { } static void incrbyCommand(redisClient *c) { - int incr = atoi(c->argv[2]->ptr); + long long incr = strtoll(c->argv[2]->ptr, NULL, 10); incrDecrCommand(c,incr); } static void decrbyCommand(redisClient *c) { - int incr = atoi(c->argv[2]->ptr); + long long incr = strtoll(c->argv[2]->ptr, NULL, 10); incrDecrCommand(c,-incr); } /* ========================= Type agnostic commands ========================= */ static void delCommand(redisClient *c) { - if (deleteKey(c->db,c->argv[1])) { - server.dirty++; - addReply(c,shared.cone); - } else { + int deleted = 0, j; + + for (j = 1; j < c->argc; j++) { + if (deleteKey(c->db,c->argv[j])) { + server.dirty++; + deleted++; + } + } + switch(deleted) { + case 0: addReply(c,shared.czero); + break; + case 1: + addReply(c,shared.cone); + break; + default: + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted)); + break; } } @@ -2373,10 +2503,12 @@ static void bgsaveCommand(redisClient *c) { static void shutdownCommand(redisClient *c) { redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); + /* XXX: TODO kill the child if there is a bgsave in progress */ if (rdbSave(server.dbfilename) == REDIS_OK) { if (server.daemonize) { - unlink(server.pidfile); + unlink(server.pidfile); } + redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); redisLog(REDIS_WARNING,"Server exit now, bye bye..."); exit(1); } else { @@ -2804,6 +2936,41 @@ static void sremCommand(redisClient *c) { } } +static void smoveCommand(redisClient *c) { + robj *srcset, *dstset; + + srcset = lookupKeyWrite(c->db,c->argv[1]); + dstset = lookupKeyWrite(c->db,c->argv[2]); + + /* If the source key does not exist return 0, if it's of the wrong type + * raise an error */ + if (srcset == NULL || srcset->type != REDIS_SET) { + addReply(c, srcset ? shared.wrongtypeerr : shared.czero); + return; + } + /* Error if the destination key is not a set as well */ + if (dstset && dstset->type != REDIS_SET) { + addReply(c,shared.wrongtypeerr); + return; + } + /* Remove the element from the source set */ + if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) { + /* Key not found in the src set! return zero */ + addReply(c,shared.czero); + return; + } + server.dirty++; + /* Add the element to the destination set */ + if (!dstset) { + dstset = createSetObject(); + dictAdd(c->db->dict,c->argv[2],dstset); + incrRefCount(c->argv[2]); + } + if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK) + incrRefCount(c->argv[3]); + addReply(c,shared.cone); +} + static void sismemberCommand(redisClient *c) { robj *set; @@ -2854,7 +3021,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r robj *lenobj = NULL, *dstset = NULL; int j, cardinality = 0; - if (!dv) oom("sinterCommand"); + if (!dv) oom("sinterGenericCommand"); for (j = 0; j < setsnum; j++) { robj *setobj; @@ -2895,9 +3062,6 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r /* If we have a target key where to store the resulting set * create this key with an empty set inside */ dstset = createSetObject(); - deleteKey(c->db,dstkey); - dictAdd(c->db->dict,dstkey,dstset); - incrRefCount(dstkey); } /* Iterate all the elements of the first (smallest) set, and test @@ -2926,10 +3090,18 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r } dictReleaseIterator(di); + if (dstkey) { + /* Store the resulting set into the target */ + deleteKey(c->db,dstkey); + dictAdd(c->db->dict,dstkey,dstset); + incrRefCount(dstkey); + } + if (!dstkey) { lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality); } else { - addReply(c,shared.ok); + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n", + dictSize((dict*)dstset->ptr))); server.dirty++; } zfree(dv); @@ -2943,14 +3115,17 @@ static void sinterstoreCommand(redisClient *c) { sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]); } -static void sunionGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) { +#define REDIS_OP_UNION 0 +#define REDIS_OP_DIFF 1 + +static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) { dict **dv = zmalloc(sizeof(dict*)*setsnum); dictIterator *di; dictEntry *de; - robj *lenobj = NULL, *dstset = NULL; + robj *dstset = NULL; int j, cardinality = 0; - if (!dv) oom("sunionCommand"); + if (!dv) oom("sunionDiffGenericCommand"); for (j = 0; j < setsnum; j++) { robj *setobj; @@ -2974,27 +3149,10 @@ static void sunionGenericCommand(redisClient *c, robj **setskeys, int setsnum, r * this set object will be the resulting object to set into the target key*/ dstset = createSetObject(); - /* The first thing we should output is the total number of elements... - * since this is a multi-bulk write, but at this stage we don't know - * the intersection set size, so we use a trick, append an empty object - * to the output list and save the pointer to later modify it with the - * right length */ - if (!dstkey) { - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); - } else { - /* If we have a target key where to store the resulting set - * create this key with an empty set inside */ - deleteKey(c->db,dstkey); - dictAdd(c->db->dict,dstkey,dstset); - incrRefCount(dstkey); - server.dirty++; - } - /* Iterate all the elements of all the sets, add every element a single * time to the result set */ for (j = 0; j < setsnum; j++) { + if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */ if (!dv[j]) continue; /* non existing keys are like empty sets */ di = dictGetIterator(dv[j]); @@ -3005,51 +3163,84 @@ static void sunionGenericCommand(redisClient *c, robj **setskeys, int setsnum, r /* dictAdd will not add the same element multiple times */ ele = dictGetEntryKey(de); - if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) { - incrRefCount(ele); - if (!dstkey) { - addReplySds(c,sdscatprintf(sdsempty(), - "$%d\r\n",sdslen(ele->ptr))); - addReply(c,ele); - addReply(c,shared.crlf); + if (op == REDIS_OP_UNION || j == 0) { + if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) { + incrRefCount(ele); cardinality++; } + } else if (op == REDIS_OP_DIFF) { + if (dictDelete(dstset->ptr,ele) == DICT_OK) { + cardinality--; + } } } dictReleaseIterator(di); + + if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */ } + /* Output the content of the resulting set, if not in STORE mode */ + if (!dstkey) { + addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality)); + di = dictGetIterator(dstset->ptr); + if (!di) oom("dictGetIterator"); + while((de = dictNext(di)) != NULL) { + robj *ele; + + ele = dictGetEntryKey(de); + addReplySds(c,sdscatprintf(sdsempty(), + "$%d\r\n",sdslen(ele->ptr))); + addReply(c,ele); + addReply(c,shared.crlf); + } + dictReleaseIterator(di); + } else { + /* If we have a target key where to store the resulting set + * create this key with the result set inside */ + deleteKey(c->db,dstkey); + dictAdd(c->db->dict,dstkey,dstset); + incrRefCount(dstkey); + } + + /* Cleanup */ if (!dstkey) { - lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality); decrRefCount(dstset); } else { - addReply(c,shared.ok); + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n", + dictSize((dict*)dstset->ptr))); server.dirty++; } zfree(dv); } static void sunionCommand(redisClient *c) { - sunionGenericCommand(c,c->argv+1,c->argc-1,NULL); + sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION); } static void sunionstoreCommand(redisClient *c) { - sunionGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]); + sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION); +} + +static void sdiffCommand(redisClient *c) { + sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF); +} + +static void sdiffstoreCommand(redisClient *c) { + sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF); } static void flushdbCommand(redisClient *c) { + server.dirty += dictSize(c->db->dict); dictEmpty(c->db->dict); dictEmpty(c->db->expires); - server.dirty++; addReply(c,shared.ok); - rdbSave(server.dbfilename); } static void flushallCommand(redisClient *c) { - emptyDb(); - server.dirty++; + server.dirty += emptyDb(); addReply(c,shared.ok); rdbSave(server.dbfilename); + server.dirty++; } redisSortOperation *createSortOperation(int type, robj *pattern) { @@ -3224,13 +3415,14 @@ static void sortCommand(redisClient *c) { j = 0; if (sortval->type == REDIS_LIST) { list *list = sortval->ptr; - listNode *ln = list->head; - while(ln) { + listNode *ln; + + listRewind(list); + while((ln = listYield(list))) { robj *ele = ln->value; vector[j].obj = ele; vector[j].u.score = 0; vector[j].u.cmpobj = NULL; - ln = ln->next; j++; } } else { @@ -3284,7 +3476,10 @@ static void sortCommand(redisClient *c) { server.sort_desc = desc; server.sort_alpha = alpha; server.sort_bypattern = sortby ? 1 : 0; - qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare); + if (sortby && (start != 0 || end != vectorlen-1)) + pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end); + else + qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare); } /* Send command output to the output buffer, performing the specified @@ -3292,14 +3487,15 @@ static void sortCommand(redisClient *c) { outputlen = getop ? getop*(end-start+1) : end-start+1; addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen)); for (j = start; j <= end; j++) { - listNode *ln = operations->head; + listNode *ln; if (!getop) { addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n", sdslen(vector[j].obj->ptr))); addReply(c,vector[j].obj); addReply(c,shared.crlf); } - while(ln) { + listRewind(operations); + while((ln = listYield(operations))) { redisSortOperation *sop = ln->value; robj *val = lookupKeyByPattern(c->db,sop->pattern, vector[j].obj); @@ -3316,7 +3512,6 @@ static void sortCommand(redisClient *c) { } else if (sop->type == REDIS_SORT_DEL) { /* TODO */ } - ln = ln->next; } } @@ -3336,26 +3531,43 @@ static void infoCommand(redisClient *c) { info = sdscatprintf(sdsempty(), "redis_version:%s\r\n" + "uptime_in_seconds:%d\r\n" + "uptime_in_days:%d\r\n" "connected_clients:%d\r\n" "connected_slaves:%d\r\n" "used_memory:%zu\r\n" "changes_since_last_save:%lld\r\n" + "bgsave_in_progress:%d\r\n" "last_save_time:%d\r\n" "total_connections_received:%lld\r\n" "total_commands_processed:%lld\r\n" - "uptime_in_seconds:%d\r\n" - "uptime_in_days:%d\r\n" + "role:%s\r\n" ,REDIS_VERSION, + uptime, + uptime/(3600*24), listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), server.usedmemory, server.dirty, + server.bgsaveinprogress, server.lastsave, server.stat_numconnections, server.stat_numcommands, - uptime, - uptime/(3600*24) + server.masterhost == NULL ? "master" : "slave" ); + if (server.masterhost) { + info = sdscatprintf(info, + "master_host:%s\r\n" + "master_port:%d\r\n" + "master_link_status:%s\r\n" + "master_last_io_seconds_ago:%d\r\n" + ,server.masterhost, + server.masterport, + (server.replstate == REDIS_REPL_CONNECTED) ? + "up" : "down", + (int)(time(NULL)-server.master->lastinteraction) + ); + } addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info))); addReplySds(c,info); addReply(c,shared.crlf); @@ -3453,25 +3665,20 @@ static void expireCommand(redisClient *c) { } } -/* =============================== Replication ============================= */ - -/* Send the whole output buffer syncronously to the slave. This a general operation in theory, but it is actually useful only for replication. */ -static int flushClientOutput(redisClient *c) { - int retval; - time_t start = time(NULL); +static void ttlCommand(redisClient *c) { + time_t expire; + int ttl = -1; - while(listLength(c->reply)) { - if (time(NULL)-start > 5) return REDIS_ERR; /* 5 seconds timeout */ - retval = aeWait(c->fd,AE_WRITABLE,1000); - if (retval == -1) { - return REDIS_ERR; - } else if (retval & AE_WRITABLE) { - sendReplyToClient(NULL, c->fd, c, AE_WRITABLE); - } + expire = getExpire(c->db,c->argv[1]); + if (expire != -1) { + ttl = (int) (expire-time(NULL)); + if (ttl < 0) ttl = -1; } - return REDIS_OK; + addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl)); } +/* =============================== Replication ============================= */ + static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) { ssize_t nwritten, ret = size; time_t start = time(NULL); @@ -3557,11 +3764,10 @@ static void syncCommand(redisClient *c) { redisClient *slave; listNode *ln; - ln = server.slaves->head; - while(ln) { + listRewind(server.slaves); + while((ln = listYield(server.slaves))) { slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; - ln = ln->next; } if (ln) { /* Perfect, the server is already registering differences for @@ -3587,59 +3793,115 @@ static void syncCommand(redisClient *c) { } c->replstate = REDIS_REPL_WAIT_BGSAVE_END; } + c->repldbfd = -1; c->flags |= REDIS_SLAVE; c->slaveseldb = 0; if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail"); - redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); return; } -#if 0 -static void _syncCommand(redisClient *c) { - struct stat sb; - int fd = -1, len; - time_t start = time(NULL); - char sizebuf[32]; - - /* ignore SYNC if aleady slave or in monitor mode */ - if (c->flags & REDIS_SLAVE) return; - - redisLog(REDIS_NOTICE,"Slave ask for synchronization"); - if (flushClientOutput(c) == REDIS_ERR || - rdbSave(server.dbfilename) != REDIS_OK) - goto closeconn; +static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { + redisClient *slave = privdata; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + char buf[REDIS_IOBUF_LEN]; + ssize_t nwritten, buflen; + + if (slave->repldboff == 0) { + /* Write the bulk write count before to transfer the DB. In theory here + * we don't know how much room there is in the output buffer of the + * socket, but in pratice SO_SNDLOWAT (the minimum count for output + * operations) will never be smaller than the few bytes we need. */ + sds bulkcount; + + bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) + slave->repldbsize); + if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount)) + { + sdsfree(bulkcount); + freeClient(slave); + return; + } + sdsfree(bulkcount); + } + lseek(slave->repldbfd,slave->repldboff,SEEK_SET); + buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN); + if (buflen <= 0) { + redisLog(REDIS_WARNING,"Read error sending DB to slave: %s", + (buflen == 0) ? "premature EOF" : strerror(errno)); + freeClient(slave); + return; + } + if ((nwritten = write(fd,buf,buflen)) == -1) { + redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s", + strerror(errno)); + freeClient(slave); + return; + } + slave->repldboff += nwritten; + if (slave->repldboff == slave->repldbsize) { + close(slave->repldbfd); + slave->repldbfd = -1; + aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); + slave->replstate = REDIS_REPL_ONLINE; + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, + sendReplyToClient, slave, NULL) == AE_ERR) { + freeClient(slave); + return; + } + addReplySds(slave,sdsempty()); + redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); + } +} - fd = open(server.dbfilename, O_RDONLY); - if (fd == -1 || fstat(fd,&sb) == -1) goto closeconn; - len = sb.st_size; +static void updateSalvesWaitingBgsave(int bgsaveerr) { + listNode *ln; + int startbgsave = 0; - snprintf(sizebuf,32,"$%d\r\n",len); - if (syncWrite(c->fd,sizebuf,strlen(sizebuf),5) == -1) goto closeconn; - while(len) { - char buf[1024]; - int nread; + listRewind(server.slaves); + while((ln = listYield(server.slaves))) { + redisClient *slave = ln->value; - if (time(NULL)-start > REDIS_MAX_SYNC_TIME) goto closeconn; - nread = read(fd,buf,1024); - if (nread == -1) goto closeconn; - len -= nread; - if (syncWrite(c->fd,buf,nread,5) == -1) goto closeconn; + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) { + startbgsave = 1; + slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; + } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { + struct stat buf; + + if (bgsaveerr != REDIS_OK) { + freeClient(slave); + redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); + continue; + } + if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 || + fstat(slave->repldbfd,&buf) == -1) { + freeClient(slave); + redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); + continue; + } + slave->repldboff = 0; + slave->repldbsize = buf.st_size; + slave->replstate = REDIS_REPL_SEND_BULK; + aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) { + freeClient(slave); + continue; + } + } } - if (syncWrite(c->fd,"\r\n",2,5) == -1) goto closeconn; - close(fd); - c->flags |= REDIS_SLAVE; - c->slaveseldb = 0; - if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail"); - redisLog(REDIS_NOTICE,"Synchronization with slave succeeded"); - return; + if (startbgsave) { + if (rdbSaveBackground(server.dbfilename) != REDIS_OK) { + listRewind(server.slaves); + redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed"); + while((ln = listYield(server.slaves))) { + redisClient *slave = ln->value; -closeconn: - if (fd != -1) close(fd); - c->flags |= REDIS_CLOSE; - redisLog(REDIS_WARNING,"Synchronization with slave failed"); - return; + if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) + freeClient(slave); + } + } + } } -#endif static int syncWithMaster(void) { char buf[1024], tmpfile[256]; @@ -3660,7 +3922,7 @@ static int syncWithMaster(void) { return REDIS_ERR; } /* Read the bulk write count */ - if (syncReadLine(fd,buf,1024,5) == -1) { + if (syncReadLine(fd,buf,1024,3600) == -1) { close(fd); redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s", strerror(errno)); @@ -3715,8 +3977,114 @@ static int syncWithMaster(void) { return REDIS_OK; } +static void slaveofCommand(redisClient *c) { + if (!strcasecmp(c->argv[1]->ptr,"no") && + !strcasecmp(c->argv[2]->ptr,"one")) { + if (server.masterhost) { + sdsfree(server.masterhost); + server.masterhost = NULL; + if (server.master) freeClient(server.master); + server.replstate = REDIS_REPL_NONE; + redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)"); + } + } else { + sdsfree(server.masterhost); + server.masterhost = sdsdup(c->argv[1]->ptr); + server.masterport = atoi(c->argv[2]->ptr); + if (server.master) freeClient(server.master); + server.replstate = REDIS_REPL_CONNECT; + redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", + server.masterhost, server.masterport); + } + addReply(c,shared.ok); +} + +/* ============================ Maxmemory directive ======================== */ + +/* This function gets called when 'maxmemory' is set on the config file to limit + * the max memory used by the server, and we are out of memory. + * This function will try to, in order: + * + * - Free objects from the free list + * - Try to remove keys with an EXPIRE set + * + * It is not possible to free enough memory to reach used-memory < maxmemory + * the server will start refusing commands that will enlarge even more the + * memory usage. + */ +static void freeMemoryIfNeeded(void) { + while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) { + if (listLength(server.objfreelist)) { + robj *o; + + listNode *head = listFirst(server.objfreelist); + o = listNodeValue(head); + listDelNode(server.objfreelist,head); + zfree(o); + } else { + int j, k, freed = 0; + + for (j = 0; j < server.dbnum; j++) { + int minttl = -1; + robj *minkey = NULL; + struct dictEntry *de; + + if (dictSize(server.db[j].expires)) { + freed = 1; + /* From a sample of three keys drop the one nearest to + * the natural expire */ + for (k = 0; k < 3; k++) { + time_t t; + + de = dictGetRandomKey(server.db[j].expires); + t = (time_t) dictGetEntryVal(de); + if (minttl == -1 || t < minttl) { + minkey = dictGetEntryKey(de); + minttl = t; + } + } + deleteKey(server.db+j,minkey); + } + } + if (!freed) return; /* nothing to free... */ + } + } +} + +/* ================================= Debugging ============================== */ + +static void debugCommand(redisClient *c) { + if (!strcasecmp(c->argv[1]->ptr,"segfault")) { + *((char*)-1) = 'x'; + } else { + addReplySds(c,sdsnew("-ERR Syntax error, try DEBUG SEGFAULT\r\n")); + } +} + /* =================================== Main! ================================ */ +#ifdef __linux__ +int linuxOvercommitMemoryValue(void) { + FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r"); + char buf[64]; + + if (!fp) return -1; + if (fgets(buf,64,fp) == NULL) { + fclose(fp); + return -1; + } + fclose(fp); + + return atoi(buf); +} + +void linuxOvercommitMemoryWarning(void) { + if (linuxOvercommitMemoryValue() == 0) { + redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts."); + } +} +#endif /* __linux__ */ + static void daemonize(void) { int fd; FILE *fp; @@ -3742,6 +4110,10 @@ static void daemonize(void) { } int main(int argc, char **argv) { +#ifdef __linux__ + linuxOvercommitMemoryWarning(); +#endif + initServerConfig(); if (argc == 2) { ResetServerSaveParams(); @@ -3749,6 +4121,8 @@ int main(int argc, char **argv) { } else if (argc > 2) { fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n"); exit(1); + } else { + redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'"); } initServer(); if (server.daemonize) daemonize();