X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/3a401464e9c42fafeaa9405db60d10b3256aca51..b0aa9bc8cdd36925b2d8e5d3a9c1e1476eaa6fc1:/src/redis.c diff --git a/src/redis.c b/src/redis.c index 4cc378e2..eed21263 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009-2010, Salvatore Sanfilippo + * Copyright (c) 2009-2012, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -49,6 +49,7 @@ #include #include #include +#include /* Our shared "common" objects */ @@ -106,6 +107,11 @@ struct redisCommand *commandTable; * results. For instance SPOP and RANDOMKEY are two random commands. * S: Sort command output array if called from script, so that the output * is deterministic. + * l: Allow command while loading the database. + * t: Allow command while a slave has stale data but is not allowed to + * server this data. Normally no command is accepted in this condition + * but just a few. + * M: Do not automatically propagate the command on MONITOR. */ struct redisCommand redisCommandTable[] = { {"get",getCommand,2,"r",0,NULL,1,1,1,0,0}, @@ -148,7 +154,7 @@ struct redisCommand redisCommandTable[] = { {"sismember",sismemberCommand,3,"r",0,NULL,1,1,1,0,0}, {"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0}, {"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0}, - {"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0}, + {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0}, {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0}, {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0}, @@ -212,25 +218,26 @@ struct redisCommand redisCommandTable[] = { {"lastsave",lastsaveCommand,1,"r",0,NULL,0,0,0,0,0}, {"type",typeCommand,2,"r",0,NULL,1,1,1,0,0}, {"multi",multiCommand,1,"rs",0,NULL,0,0,0,0,0}, - {"exec",execCommand,1,"s",0,NULL,0,0,0,0,0}, + {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0}, {"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0}, {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0}, + {"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0}, {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0}, {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0}, - {"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0}, - {"info",infoCommand,-1,"r",0,NULL,0,0,0,0,0}, + {"sort",sortCommand,-2,"wm",0,NULL,1,1,1,0,0}, + {"info",infoCommand,-1,"rlt",0,NULL,0,0,0,0,0}, {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0}, {"ttl",ttlCommand,2,"r",0,NULL,1,1,1,0,0}, {"pttl",pttlCommand,2,"r",0,NULL,1,1,1,0,0}, {"persist",persistCommand,2,"w",0,NULL,1,1,1,0,0}, - {"slaveof",slaveofCommand,3,"as",0,NULL,0,0,0,0,0}, + {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0}, {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0}, {"config",configCommand,-2,"ar",0,NULL,0,0,0,0,0}, - {"subscribe",subscribeCommand,-2,"rps",0,NULL,0,0,0,0,0}, - {"unsubscribe",unsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0}, - {"psubscribe",psubscribeCommand,-2,"rps",0,NULL,0,0,0,0,0}, - {"punsubscribe",punsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0}, - {"publish",publishCommand,3,"pf",0,NULL,0,0,0,0,0}, + {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0}, + {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, + {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0}, + {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0}, + {"publish",publishCommand,3,"pflt",0,NULL,0,0,0,0,0}, {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0}, {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0}, {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0}, @@ -242,7 +249,9 @@ struct redisCommand redisCommandTable[] = { {"evalsha",evalShaCommand,-3,"s",0,zunionInterGetKeys,0,0,0,0,0}, {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0}, {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0}, - {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0} + {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}, + {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0}, + {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0} }; /*============================ Utility functions ============================ */ @@ -325,17 +334,6 @@ err: if (server.logfile) close(fd); } -/* Redis generally does not try to recover from out of memory conditions - * when allocating objects or strings, it is not clear if it will be possible - * to report this condition to the client since the networking layer itself - * is based on heap allocation for send buffers, so we simply abort. - * At least the code will be simpler to read... */ -void oom(const char *msg) { - redisLog(REDIS_WARNING, "%s: Out of memory\n",msg); - sleep(1); - abort(); -} - /* Return the UNIX time in microseconds */ long long ustime(void) { struct timeval tv; @@ -394,7 +392,8 @@ int dictSdsKeyCompare(void *privdata, const void *key1, return memcmp(key1, key2, l1) == 0; } -/* A case insensitive version used for the command lookup table. */ +/* A case insensitive version used for the command lookup table and other + * places where case insensitive non binary-safe comparison is needed. */ int dictSdsKeyCaseCompare(void *privdata, const void *key1, const void *key2) { @@ -509,6 +508,16 @@ dictType dbDictType = { dictRedisObjectDestructor /* val destructor */ }; +/* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */ +dictType shaScriptObjectDictType = { + dictSdsCaseHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCaseCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + dictRedisObjectDestructor /* val destructor */ +}; + /* Db->expires */ dictType keyptrDictType = { dictSdsHash, /* hash function */ @@ -581,10 +590,16 @@ void incrementallyRehash(void) { int j; for (j = 0; j < server.dbnum; j++) { + /* Keys dictionary */ if (dictIsRehashing(server.db[j].dict)) { dictRehashMilliseconds(server.db[j].dict,1); break; /* already used our millisecond for this loop... */ } + /* Expires */ + if (dictIsRehashing(server.db[j].expires)) { + dictRehashMilliseconds(server.db[j].expires,1); + break; /* already used our millisecond for this loop... */ + } } } @@ -608,19 +623,35 @@ void updateDictResizePolicy(void) { * it will get more aggressive to avoid that too much memory is used by * keys that can be removed from the keyspace. */ void activeExpireCycle(void) { - int j; - long long start = mstime(); + int j, iteration = 0; + long long start = ustime(), timelimit; + + /* We can use at max REDIS_EXPIRELOOKUPS_TIME_PERC percentage of CPU time + * per iteration. Since this function gets called with a frequency of + * REDIS_HZ times per second, the following is the max amount of + * microseconds we can spend in this function. */ + timelimit = 1000000*REDIS_EXPIRELOOKUPS_TIME_PERC/REDIS_HZ/100; + if (timelimit <= 0) timelimit = 1; for (j = 0; j < server.dbnum; j++) { - int expired, iteration = 0; + int expired; redisDb *db = server.db+j; /* Continue to expire if at the end of the cycle more than 25% * of the keys were expired. */ do { - long num = dictSize(db->expires); + unsigned long num = dictSize(db->expires); + unsigned long slots = dictSlots(db->expires); long long now = mstime(); + /* When there are less than 1% filled slots getting random + * keys is expensive, so stop here waiting for better times... + * The dictionary will be resized asap. */ + if (num && slots > DICT_HT_INITIAL_SIZE && + (num*100/slots < 1)) break; + + /* The main collection cycle. Sample random keys among keys + * with an expire set, checking for expired ones. */ expired = 0; if (num > REDIS_EXPIRELOOKUPS_PER_CRON) num = REDIS_EXPIRELOOKUPS_PER_CRON; @@ -645,8 +676,8 @@ void activeExpireCycle(void) { * expire. So after a given amount of milliseconds return to the * caller waiting for the other active expire cycle. */ iteration++; - if ((iteration & 0xff) == 0 && /* & 0xff is the same as % 255 */ - (mstime()-start) > REDIS_EXPIRELOOKUPS_TIME_LIMIT) return; + if ((iteration & 0xf) == 0 && /* check once every 16 cycles. */ + (ustime()-start) > timelimit) return; } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); } } @@ -732,13 +763,13 @@ int clientsCronResizeQueryBuffer(redisClient *c) { } void clientsCron(void) { - /* Make sure to process at least 1/100 of clients per call. - * Since this function is called 10 times per second we are sure that + /* Make sure to process at least 1/(REDIS_HZ*10) of clients per call. + * Since this function is called REDIS_HZ times per second we are sure that * in the worst case we process all the clients in 10 seconds. * In normal conditions (a reasonable number of clients) we process * all the clients in a shorter time. */ int numclients = listLength(server.clients); - int iterations = numclients/100; + int iterations = numclients/(REDIS_HZ*10); if (iterations < 50) iterations = (numclients < 50) ? numclients : 50; @@ -760,8 +791,27 @@ void clientsCron(void) { } } +/* This is our timer interrupt, called REDIS_HZ times per second. + * Here is where we do a number of things that need to be done asynchronously. + * For instance: + * + * - Active expired keys collection (it is also performed in a lazy way on + * lookup). + * - Software watchdong. + * - Update some statistic. + * - Incremental rehashing of the DBs hash tables. + * - Triggering BGSAVE / AOF rewrite, and handling of terminated children. + * - Clients timeout of differnet kinds. + * - Replication reconnection. + * - Many more... + * + * Everything directly called here will be called REDIS_HZ times per second, + * so in order to throttle execution of things we want to do less frequently + * a macro is used: run_with_period(milliseconds) { .... } + */ + int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { - int j, loops = server.cronloops; + int j; REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(id); REDIS_NOTUSED(clientData); @@ -776,7 +826,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * To access a global var is faster than calling time(NULL) */ server.unixtime = time(NULL); - trackOperationsPerSecond(); + run_with_period(100) trackOperationsPerSecond(); /* We have just 22 bits per object for LRU information. * So we use an (eventually wrapping) LRU clock with 10 seconds resolution. @@ -804,15 +854,17 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } /* Show some info about non-empty databases */ - for (j = 0; j < server.dbnum; j++) { - long long size, used, vkeys; - - size = dictSlots(server.db[j].dict); - used = dictSize(server.db[j].dict); - vkeys = dictSize(server.db[j].expires); - if (!(loops % 50) && (used || vkeys)) { - redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); - /* dictPrintStats(server.dict); */ + run_with_period(5000) { + for (j = 0; j < server.dbnum; j++) { + long long size, used, vkeys; + + size = dictSlots(server.db[j].dict); + used = dictSize(server.db[j].dict); + vkeys = dictSize(server.db[j].expires); + if (used || vkeys) { + redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); + /* dictPrintStats(server.dict); */ + } } } @@ -823,16 +875,19 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * a lot of memory movements in the parent will cause a lot of pages * copied. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { - if (!(loops % 10)) tryResizeHashTables(); + tryResizeHashTables(); if (server.activerehashing) incrementallyRehash(); } /* Show information about connected clients */ - if (!(loops % 50)) { - redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use", - listLength(server.clients)-listLength(server.slaves), - listLength(server.slaves), - zmalloc_used_memory()); + if (!server.sentinel_mode) { + run_with_period(5000) { + redisLog(REDIS_VERBOSE, + "%d clients connected (%d slaves), %zu bytes in use", + listLength(server.clients)-listLength(server.slaves), + listLength(server.slaves), + zmalloc_used_memory()); + } } /* We need to do a few operations on clients asynchronously. */ @@ -859,8 +914,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); - } else { + } else if (pid == server.aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); + } else { + redisLog(REDIS_WARNING, + "Warning, detected child with unmatched pid: %ld", + (long)pid); } updateDictResizePolicy(); } @@ -910,10 +969,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Replication cron function -- used to reconnect to master and * to detect transfer failures. */ - if (!(loops % 10)) replicationCron(); + run_with_period(1000) replicationCron(); + + /* Run the sentinel timer if we are in sentinel mode. */ + run_with_period(100) { + if (server.sentinel_mode) sentinelTimer(); + } server.cronloops++; - return 100; + return 1000/REDIS_HZ; } /* This function gets called every time Redis is entering the @@ -985,6 +1049,8 @@ void createSharedObjects(void) { "-READONLY You can't write against a read only slave.\r\n")); shared.oomerr = createObject(REDIS_STRING,sdsnew( "-OOM command not allowed when used memory > 'maxmemory'.\r\n")); + shared.execaborterr = createObject(REDIS_STRING,sdsnew( + "-EXECABORT Transaction discarded because of previous errors.\r\n")); shared.space = createObject(REDIS_STRING,sdsnew(" ")); shared.colon = createObject(REDIS_STRING,sdsnew(":")); shared.plus = createObject(REDIS_STRING,sdsnew("+")); @@ -1002,6 +1068,7 @@ void createSharedObjects(void) { shared.del = createStringObject("DEL",3); shared.rpop = createStringObject("RPOP",4); shared.lpop = createStringObject("LPOP",4); + shared.lpush = createStringObject("LPUSH",5); for (j = 0; j < REDIS_SHARED_INTEGERS; j++) { shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j); shared.integers[j]->encoding = REDIS_ENCODING_INT; @@ -1043,6 +1110,9 @@ void initServerConfig() { server.aof_rewrite_base_size = 0; server.aof_rewrite_scheduled = 0; server.aof_last_fsync = time(NULL); + server.aof_rewrite_time_last = -1; + server.aof_rewrite_time_start = -1; + server.aof_lastbgrewrite_status = REDIS_OK; server.aof_delayed_fsync = 0; server.aof_fd = -1; server.aof_selected_db = -1; /* Make sure the first time will not match */ @@ -1050,6 +1120,9 @@ void initServerConfig() { server.pidfile = zstrdup("/var/run/redis.pid"); server.rdb_filename = zstrdup("dump.rdb"); server.aof_filename = zstrdup("appendonly.aof"); + server.mdb_state = REDIS_MDB_OFF; + server.mdb_environment = zstrdup("archive"); + server.mdb_mapsize = 10485760; server.requirepass = NULL; server.rdb_compression = 1; server.rdb_checksum = 1; @@ -1090,6 +1163,7 @@ void initServerConfig() { server.repl_serve_stale_data = 1; server.repl_slave_ro = 1; server.repl_down_since = time(NULL); + server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; /* Client output buffer limits */ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0; @@ -1116,6 +1190,8 @@ void initServerConfig() { server.delCommand = lookupCommandByCString("del"); server.multiCommand = lookupCommandByCString("multi"); server.lpushCommand = lookupCommandByCString("lpush"); + server.lpopCommand = lookupCommandByCString("lpop"); + server.rpopCommand = lookupCommandByCString("rpop"); /* Slow log */ server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN; @@ -1191,6 +1267,7 @@ void initServer() { server.slaves = listCreate(); server.monitors = listCreate(); server.unblocked_clients = listCreate(); + server.ready_keys = listCreate(); createSharedObjects(); adjustOpenFilesLimit(); @@ -1221,6 +1298,7 @@ void initServer() { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); + server.db[j].ready_keys = dictCreate(&setDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; } @@ -1231,9 +1309,11 @@ void initServer() { server.cronloops = 0; server.rdb_child_pid = -1; server.aof_child_pid = -1; - server.aof_rewrite_buf = sdsempty(); + aofRewriteBufferReset(); server.aof_buf = sdsempty(); server.lastsave = time(NULL); + server.rdb_save_time_last = -1; + server.rdb_save_time_start = -1; server.dirty = 0; server.stat_numcommands = 0; server.stat_numconnections = 0; @@ -1254,9 +1334,9 @@ void initServer() { server.stop_writes_on_bgsave_err = 1; aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, - acceptTcpHandler,NULL) == AE_ERR) oom("creating file event"); + acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event."); if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, - acceptUnixHandler,NULL) == AE_ERR) oom("creating file event"); + acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event."); if (server.aof_state == REDIS_AOF_ON) { server.aof_fd = open(server.aof_filename, @@ -1268,13 +1348,22 @@ void initServer() { } } + if (server.aof_state == REDIS_MDB_ON) { + int retval = startKeyArchive(); + if (retval != 0) { + redisLog(REDIS_WARNING, "Can't open the key-archive environment: %s", + mdb_strerror(retval)); + exit(1); + } + } + /* 32 bit instances are limited to 4GB of address space, so if there is * no explicit limit in the user provided configuration we set a limit - * at 3.5GB using maxmemory with 'noeviction' policy'. This saves - * useless crashes of the Redis instance. */ + * at 3 GB using maxmemory with 'noeviction' policy'. This avoids + * useless crashes of the Redis instance for out of memory. */ if (server.arch_bits == 32 && server.maxmemory == 0) { - redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with 'noeviction' policy now."); - server.maxmemory = 3584LL*(1024*1024); /* 3584 MB = 3.5 GB */ + redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now."); + server.maxmemory = 3072LL*(1024*1024); /* 3 GB */ server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION; } @@ -1305,6 +1394,9 @@ void populateCommandTable(void) { case 's': c->flags |= REDIS_CMD_NOSCRIPT; break; case 'R': c->flags |= REDIS_CMD_RANDOM; break; case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break; + case 'l': c->flags |= REDIS_CMD_LOADING; break; + case 't': c->flags |= REDIS_CMD_STALE; break; + case 'M': c->flags |= REDIS_CMD_SKIP_MONITOR; break; default: redisPanic("Unsupported command flag"); break; } f++; @@ -1380,7 +1472,7 @@ struct redisCommand *lookupCommandByCString(char *s) { } /* Propagate the specified command (in the context of the specified database id) - * to AOF, Slaves and Monitors. + * to AOF and Slaves. * * flags are an xor between: * + REDIS_PROPAGATE_NONE (no propagation of command at all) @@ -1410,8 +1502,12 @@ void call(redisClient *c, int flags) { /* Sent the command to clients in MONITOR mode, only if the commands are * not geneated from reading an AOF. */ - if (listLength(server.monitors) && !server.loading) + if (listLength(server.monitors) && + !server.loading && + !(c->cmd->flags & REDIS_CMD_SKIP_MONITOR)) + { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); + } /* Call the command. */ redisOpArrayInit(&server.also_propagate); @@ -1461,7 +1557,7 @@ void call(redisClient *c, int flags) { } /* If this function gets called we already read a whole - * command, argments are in the client argv/argc fields. + * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the * server for a bulk read from the client. * @@ -1483,11 +1579,13 @@ int processCommand(redisClient *c) { * such as wrong arity, bad command name and so forth. */ c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { + flagTransaction(c); addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); return REDIS_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { + flagTransaction(c); addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return REDIS_OK; @@ -1496,6 +1594,7 @@ int processCommand(redisClient *c) { /* Check if the user is authenticated */ if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { + flagTransaction(c); addReplyError(c,"operation not permitted"); return REDIS_OK; } @@ -1508,6 +1607,7 @@ int processCommand(redisClient *c) { if (server.maxmemory) { int retval = freeMemoryIfNeeded(); if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) { + flagTransaction(c); addReply(c, shared.oomerr); return REDIS_OK; } @@ -1519,11 +1619,12 @@ int processCommand(redisClient *c) { && server.lastbgsave_status == REDIS_ERR && c->cmd->flags & REDIS_CMD_WRITE) { + flagTransaction(c); addReply(c, shared.bgsaveerr); return REDIS_OK; } - /* Don't accept wirte commands if this is a read only slave. But + /* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ if (server.masterhost && server.repl_slave_ro && !(c->flags & REDIS_MASTER) && @@ -1548,20 +1649,23 @@ int processCommand(redisClient *c) { * we are a slave with a broken link with master. */ if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED && server.repl_serve_stale_data == 0 && - c->cmd->proc != infoCommand && c->cmd->proc != slaveofCommand) + !(c->cmd->flags & REDIS_CMD_STALE)) { + flagTransaction(c); addReply(c, shared.masterdownerr); return REDIS_OK; } - /* Loading DB? Return an error if the command is not INFO */ - if (server.loading && c->cmd->proc != infoCommand) { + /* Loading DB? Return an error if the command has not the + * REDIS_CMD_LOADING flag. */ + if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) { addReply(c, shared.loadingerr); return REDIS_OK; } - /* Lua script too slow? Only allow SHUTDOWN NOSAVE and SCRIPT KILL. */ + /* Lua script too slow? Only allow commands with REDIS_CMD_STALE flag. */ if (server.lua_timedout && + c->cmd->proc != authCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && @@ -1569,6 +1673,7 @@ int processCommand(redisClient *c) { c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) { + flagTransaction(c); addReply(c, shared.slowscripterr); return REDIS_OK; } @@ -1582,6 +1687,8 @@ int processCommand(redisClient *c) { addReply(c,shared.queued); } else { call(c,REDIS_CALL_FULL); + if (listLength(server.ready_keys)) + handleClientsBlockedOnLists(); } return REDIS_OK; } @@ -1644,10 +1751,52 @@ int prepareForShutdown(int flags) { /*================================== Commands =============================== */ +/* Return zero if strings are the same, non-zero if they are not. + * The comparison is performed in a way that prevents an attacker to obtain + * information about the nature of the strings just monitoring the execution + * time of the function. + * + * Note that limiting the comparison length to strings up to 512 bytes we + * can avoid leaking any information about the password length and any + * possible branch misprediction related leak. + */ +int time_independent_strcmp(char *a, char *b) { + char bufa[REDIS_AUTHPASS_MAX_LEN], bufb[REDIS_AUTHPASS_MAX_LEN]; + /* The above two strlen perform len(a) + len(b) operations where either + * a or b are fixed (our password) length, and the difference is only + * relative to the length of the user provided string, so no information + * leak is possible in the following two lines of code. */ + int alen = strlen(a); + int blen = strlen(b); + int j; + int diff = 0; + + /* We can't compare strings longer than our static buffers. + * Note that this will never pass the first test in practical circumstances + * so there is no info leak. */ + if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1; + + memset(bufa,0,sizeof(bufa)); /* Constant time. */ + memset(bufb,0,sizeof(bufb)); /* Constant time. */ + /* Again the time of the following two copies is proportional to + * len(a) + len(b) so no info is leaked. */ + memcpy(bufa,a,alen); + memcpy(bufb,b,blen); + + /* Always compare all the chars in the two buffers without + * conditional expressions. */ + for (j = 0; j < sizeof(bufa); j++) { + diff |= (bufa[j] ^ bufb[j]); + } + /* Length must be equal as well. */ + diff |= alen ^ blen; + return diff; /* If zero strings are the same. */ +} + void authCommand(redisClient *c) { if (!server.requirepass) { addReplyError(c,"Client sent AUTH, but no password is set"); - } else if (!strcmp(c->argv[1]->ptr, server.requirepass)) { + } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) { c->authenticated = 1; addReply(c,shared.ok); } else { @@ -1707,7 +1856,7 @@ sds genRedisInfoString(char *section) { unsigned long lol, bib; int allsections = 0, defsections = 0; int sections = 0; - + if (section) { allsections = strcasecmp(section,"all") == 0; defsections = strcasecmp(section,"default") == 0; @@ -1720,7 +1869,11 @@ sds genRedisInfoString(char *section) { /* Server */ if (allsections || defsections || !strcasecmp(section,"server")) { struct utsname name; + char *mode; + if (server.sentinel_mode) mode = "sentinel"; + else mode = "standalone"; + if (sections++) info = sdscat(info,"\r\n"); uname(&name); info = sdscatprintf(info, @@ -1728,6 +1881,7 @@ sds genRedisInfoString(char *section) { "redis_version:%s\r\n" "redis_git_sha1:%s\r\n" "redis_git_dirty:%d\r\n" + "redis_mode:%s\r\n" "os:%s %s %s\r\n" "arch_bits:%d\r\n" "multiplexing_api:%s\r\n" @@ -1741,6 +1895,7 @@ sds genRedisInfoString(char *section) { REDIS_VERSION, redisGitSHA1(), strtol(redisGitDirty(),NULL,10) > 0, + mode, name.sysname, name.release, name.machine, server.arch_bits, aeGetApiName(), @@ -1806,21 +1961,33 @@ sds genRedisInfoString(char *section) { info = sdscatprintf(info, "# Persistence\r\n" "loading:%d\r\n" + "rdb_changes_since_last_save:%lld\r\n" + "rdb_bgsave_in_progress:%d\r\n" + "rdb_last_save_time:%ld\r\n" + "rdb_last_bgsave_status:%s\r\n" + "rdb_last_bgsave_time_sec:%ld\r\n" + "rdb_current_bgsave_time_sec:%ld\r\n" "aof_enabled:%d\r\n" - "changes_since_last_save:%lld\r\n" - "bgsave_in_progress:%d\r\n" - "last_save_time:%ld\r\n" - "last_bgsave_status:%s\r\n" - "bgrewriteaof_in_progress:%d\r\n" - "bgrewriteaof_scheduled:%d\r\n", + "aof_rewrite_in_progress:%d\r\n" + "aof_rewrite_scheduled:%d\r\n" + "aof_last_rewrite_time_sec:%ld\r\n" + "aof_current_rewrite_time_sec:%ld\r\n" + "aof_last_bgrewrite_status:%s\r\n", server.loading, - server.aof_state != REDIS_AOF_OFF, server.dirty, server.rdb_child_pid != -1, server.lastsave, - server.lastbgsave_status == REDIS_OK ? "ok" : "err", + (server.lastbgsave_status == REDIS_OK) ? "ok" : "err", + server.rdb_save_time_last, + (server.rdb_child_pid == -1) ? + -1 : time(NULL)-server.rdb_save_time_start, + server.aof_state != REDIS_AOF_OFF, server.aof_child_pid != -1, - server.aof_rewrite_scheduled); + server.aof_rewrite_scheduled, + server.aof_rewrite_time_last, + (server.aof_child_pid == -1) ? + -1 : time(NULL)-server.aof_rewrite_time_start, + (server.aof_lastbgrewrite_status == REDIS_OK) ? "ok" : "err"); if (server.aof_state != REDIS_AOF_OFF) { info = sdscatprintf(info, @@ -1828,12 +1995,14 @@ sds genRedisInfoString(char *section) { "aof_base_size:%lld\r\n" "aof_pending_rewrite:%d\r\n" "aof_buffer_length:%zu\r\n" + "aof_rewrite_buffer_length:%lu\r\n" "aof_pending_bio_fsync:%llu\r\n" "aof_delayed_fsync:%lu\r\n", (long long) server.aof_current_size, (long long) server.aof_rewrite_base_size, server.aof_rewrite_scheduled, sdslen(server.aof_buf), + aofRewriteBufferSize(), bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC), server.aof_delayed_fsync); } @@ -1924,9 +2093,10 @@ sds genRedisInfoString(char *section) { if (server.repl_state == REDIS_REPL_TRANSFER) { info = sdscatprintf(info, - "master_sync_left_bytes:%ld\r\n" + "master_sync_left_bytes:%lld\r\n" "master_sync_last_io_seconds_ago:%d\r\n" - ,(long)server.repl_transfer_left, + , (long long) + (server.repl_transfer_size - server.repl_transfer_read), (int)(server.unixtime-server.repl_transfer_lastio) ); } @@ -1936,6 +2106,11 @@ sds genRedisInfoString(char *section) { "master_link_down_since_seconds:%ld\r\n", (long)server.unixtime-server.repl_down_since); } + info = sdscatprintf(info, + "slave_priority:%d\r\n" + "slave_read_only:%d\r\n", + server.slave_priority, + server.repl_slave_ro); } info = sdscatprintf(info, "connected_slaves:%lu\r\n", @@ -1967,7 +2142,7 @@ sds genRedisInfoString(char *section) { } if (state == NULL) continue; info = sdscatprintf(info,"slave%d:%s,%d,%s\r\n", - slaveid,ip,port,state); + slaveid,ip,slave->slave_listening_port,state); slaveid++; } } @@ -2037,7 +2212,7 @@ void infoCommand(redisClient *c) { } void monitorCommand(redisClient *c) { - /* ignore MONITOR if aleady slave or in monitor mode */ + /* ignore MONITOR if already slave or in monitor mode */ if (c->flags & REDIS_SLAVE) return; c->flags |= (REDIS_SLAVE|REDIS_MONITOR); @@ -2086,7 +2261,7 @@ int freeMemoryIfNeeded(void) { } if (server.aof_state != REDIS_AOF_OFF) { mem_used -= sdslen(server.aof_buf); - mem_used -= sdslen(server.aof_rewrite_buf); + mem_used -= aofRewriteBufferSize(); } /* Check if we are over the memory limit. */ @@ -2136,7 +2311,7 @@ int freeMemoryIfNeeded(void) { de = dictGetRandomKey(dict); thiskey = dictGetKey(de); - /* When policy is volatile-lru we need an additonal lookup + /* When policy is volatile-lru we need an additional lookup * to locate the real key, as dict is set to db->expires. */ if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) de = dictFind(db->dict, thiskey); @@ -2175,6 +2350,8 @@ int freeMemoryIfNeeded(void) { long long delta; robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); + int archived = archive(db, keyobj); + redisAssert(archived != 0); propagateExpire(db,keyobj); /* We compute the amount of memory freed by dbDelete() alone. * It is possible that actually the memory needed to propagate @@ -2275,21 +2452,25 @@ void usage() { fprintf(stderr," ./redis-server /etc/redis/6379.conf\n"); fprintf(stderr," ./redis-server --port 7777\n"); fprintf(stderr," ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n"); - fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n"); + fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n\n"); + fprintf(stderr,"Sentinel mode:\n"); + fprintf(stderr," ./redis-server /etc/sentinel.conf --sentinel\n"); exit(1); } void redisAsciiArt(void) { #include "asciilogo.h" char *buf = zmalloc(1024*16); + char *mode = "stand alone"; + + if (server.sentinel_mode) mode = "sentinel"; snprintf(buf,1024*16,ascii_logo, REDIS_VERSION, redisGitSHA1(), strtol(redisGitDirty(),NULL,10) > 0, (sizeof(long) == 8) ? "64" : "32", - "stand alone", - server.port, + mode, server.port, (long) getpid() ); redisLogRaw(REDIS_NOTICE|REDIS_LOG_RAW,buf); @@ -2327,17 +2508,60 @@ void setupSignalHandlers(void) { void memtest(size_t megabytes, int passes); +/* Returns 1 if there is --sentinel among the arguments or if + * argv[0] is exactly "redis-sentinel". */ +int checkForSentinelMode(int argc, char **argv) { + int j; + + if (strstr(argv[0],"redis-sentinel") != NULL) return 1; + for (j = 1; j < argc; j++) + if (!strcmp(argv[j],"--sentinel")) return 1; + return 0; +} + +/* Function called at startup to load RDB or AOF file in memory. */ +void loadDataFromDisk(void) { + long long start = ustime(); + if (server.aof_state == REDIS_AOF_ON) { + if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK) + redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); + } else { + if (rdbLoad(server.rdb_filename) == REDIS_OK) { + redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds", + (float)(ustime()-start)/1000000); + } else if (errno != ENOENT) { + redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting."); + exit(1); + } + } +} + +void redisOutOfMemoryHandler(size_t allocation_size) { + redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!", + allocation_size); + redisPanic("OOM"); +} + int main(int argc, char **argv) { - long long start; struct timeval tv; /* We need to initialize our libraries, and the server configuration. */ zmalloc_enable_thread_safeness(); + zmalloc_set_oom_handler(redisOutOfMemoryHandler); srand(time(NULL)^getpid()); gettimeofday(&tv,NULL); dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid()); + server.sentinel_mode = checkForSentinelMode(argc,argv); initServerConfig(); + /* We need to init sentinel right now as parsing the configuration file + * in sentinel mode will have the effect of populating the sentinel + * data structures with master nodes to monitor. */ + if (server.sentinel_mode) { + initSentinelConfig(); + initSentinel(); + } + if (argc >= 2) { int j = 1; /* First option to parse in argv[] */ sds options = sdsempty(); @@ -2383,33 +2607,31 @@ int main(int argc, char **argv) { loadServerConfig(configfile,options); sdsfree(options); } else { - redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'"); + redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis"); } if (server.daemonize) daemonize(); initServer(); if (server.daemonize) createPidFile(); redisAsciiArt(); - redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION); -#ifdef __linux__ - linuxOvercommitMemoryWarning(); -#endif - start = ustime(); - if (server.aof_state == REDIS_AOF_ON) { - if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK) - redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); - } else { - if (rdbLoad(server.rdb_filename) == REDIS_OK) { - redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds", - (float)(ustime()-start)/1000000); - } else if (errno != ENOENT) { - redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting."); - exit(1); - } + + if (!server.sentinel_mode) { + /* Things only needed when not running in Sentinel mode. */ + redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION); + #ifdef __linux__ + linuxOvercommitMemoryWarning(); + #endif + loadDataFromDisk(); + if (server.ipfd > 0) + redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); + if (server.sofd > 0) + redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); } - if (server.ipfd > 0) - redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); - if (server.sofd > 0) - redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); + + /* Warning the user about suspicious maxmemory setting. */ + if (server.maxmemory > 0 && server.maxmemory < 1024*1024) { + redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); + } + aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); aeDeleteEventLoop(server.el);