* results. For instance SPOP and RANDOMKEY are two random commands.
* S: Sort command output array if called from script, so that the output
* is deterministic.
+ * l: Allow command while loading the database.
+ * t: Allow command while a slave has stale data but is not allowed to
+ * server this data. Normally no command is accepted in this condition
+ * but just a few.
*/
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
{"sismember",sismemberCommand,3,"r",0,NULL,1,1,1,0,0},
{"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0},
{"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0},
- {"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0},
+ {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
{"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
{"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
{"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
{"exec",execCommand,1,"s",0,NULL,0,0,0,0,0},
{"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
{"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
+ {"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0},
{"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
{"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
- {"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0},
- {"info",infoCommand,-1,"r",0,NULL,0,0,0,0,0},
+ {"sort",sortCommand,-2,"wm",0,NULL,1,1,1,0,0},
+ {"info",infoCommand,-1,"rlt",0,NULL,0,0,0,0,0},
{"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},
{"ttl",ttlCommand,2,"r",0,NULL,1,1,1,0,0},
{"pttl",pttlCommand,2,"r",0,NULL,1,1,1,0,0},
{"persist",persistCommand,2,"w",0,NULL,1,1,1,0,0},
- {"slaveof",slaveofCommand,3,"as",0,NULL,0,0,0,0,0},
+ {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
{"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
{"config",configCommand,-2,"ar",0,NULL,0,0,0,0,0},
- {"subscribe",subscribeCommand,-2,"rps",0,NULL,0,0,0,0,0},
- {"unsubscribe",unsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0},
- {"psubscribe",psubscribeCommand,-2,"rps",0,NULL,0,0,0,0,0},
- {"punsubscribe",punsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0},
- {"publish",publishCommand,3,"pf",0,NULL,0,0,0,0,0},
+ {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
+ {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
+ {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
+ {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
+ {"publish",publishCommand,3,"pflt",0,NULL,0,0,0,0,0},
{"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
{"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
{"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
if (server.logfile) close(fd);
}
-/* Redis generally does not try to recover from out of memory conditions
- * when allocating objects or strings, it is not clear if it will be possible
- * to report this condition to the client since the networking layer itself
- * is based on heap allocation for send buffers, so we simply abort.
- * At least the code will be simpler to read... */
-void oom(const char *msg) {
- redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
- sleep(1);
- abort();
-}
-
/* Return the UNIX time in microseconds */
long long ustime(void) {
struct timeval tv;
* a macro is used: run_with_period(milliseconds) { .... }
*/
-/* Using the following macro you can run code inside serverCron() with the
- * specified period, specified in milliseconds.
- * The actual resolution depends on REDIS_HZ. */
-#define run_with_period(_ms_) if (!(loops % ((_ms_)/(1000/REDIS_HZ))))
-
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
- int j, loops = server.cronloops;
+ int j;
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);
}
/* Show information about connected clients */
- run_with_period(5000) {
- redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
- listLength(server.clients)-listLength(server.slaves),
- listLength(server.slaves),
- zmalloc_used_memory());
+ if (!server.sentinel_mode) {
+ run_with_period(5000) {
+ redisLog(REDIS_VERBOSE,
+ "%d clients connected (%d slaves), %zu bytes in use",
+ listLength(server.clients)-listLength(server.slaves),
+ listLength(server.slaves),
+ zmalloc_used_memory());
+ }
}
/* We need to do a few operations on clients asynchronously. */
* to detect transfer failures. */
run_with_period(1000) replicationCron();
+ /* Run the sentinel timer if we are in sentinel mode. */
+ run_with_period(100) {
+ if (server.sentinel_mode) sentinelTimer();
+ }
+
server.cronloops++;
return 1000/REDIS_HZ;
}
shared.del = createStringObject("DEL",3);
shared.rpop = createStringObject("RPOP",4);
shared.lpop = createStringObject("LPOP",4);
+ shared.lpush = createStringObject("LPUSH",5);
for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
shared.integers[j]->encoding = REDIS_ENCODING_INT;
server.aof_last_fsync = time(NULL);
server.aof_rewrite_time_last = -1;
server.aof_rewrite_time_start = -1;
+ server.aof_lastbgrewrite_status = REDIS_OK;
server.aof_delayed_fsync = 0;
server.aof_fd = -1;
server.aof_selected_db = -1; /* Make sure the first time will not match */
server.repl_serve_stale_data = 1;
server.repl_slave_ro = 1;
server.repl_down_since = time(NULL);
+ server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
/* Client output buffer limits */
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
+ server.lpopCommand = lookupCommandByCString("lpop");
+ server.rpopCommand = lookupCommandByCString("rpop");
/* Slow log */
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
server.slaves = listCreate();
server.monitors = listCreate();
server.unblocked_clients = listCreate();
+ server.ready_keys = listCreate();
createSharedObjects();
adjustOpenFilesLimit();
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
+ server.db[j].ready_keys = dictCreate(&setDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
}
server.stop_writes_on_bgsave_err = 1;
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
- acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
+ acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event.");
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
- acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");
+ acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
if (server.aof_state == REDIS_AOF_ON) {
server.aof_fd = open(server.aof_filename,
case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
case 'R': c->flags |= REDIS_CMD_RANDOM; break;
case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
+ case 'l': c->flags |= REDIS_CMD_LOADING; break;
+ case 't': c->flags |= REDIS_CMD_STALE; break;
default: redisPanic("Unsupported command flag"); break;
}
f++;
return REDIS_OK;
}
- /* Don't accept wirte commands if this is a read only slave. But
+ /* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & REDIS_MASTER) &&
* we are a slave with a broken link with master. */
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
server.repl_serve_stale_data == 0 &&
- c->cmd->proc != infoCommand && c->cmd->proc != slaveofCommand)
+ !(c->cmd->flags & REDIS_CMD_STALE))
{
addReply(c, shared.masterdownerr);
return REDIS_OK;
}
- /* Loading DB? Return an error if the command is not INFO */
- if (server.loading && c->cmd->proc != infoCommand) {
+ /* Loading DB? Return an error if the command has not the
+ * REDIS_CMD_LOADING flag. */
+ if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
addReply(c, shared.loadingerr);
return REDIS_OK;
}
- /* Lua script too slow? Only allow SHUTDOWN NOSAVE and SCRIPT KILL. */
+ /* Lua script too slow? Only allow commands with REDIS_CMD_STALE flag. */
if (server.lua_timedout &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
addReply(c,shared.queued);
} else {
call(c,REDIS_CALL_FULL);
+ if (listLength(server.ready_keys))
+ handleClientsBlockedOnLists();
}
return REDIS_OK;
}
/*================================== Commands =============================== */
+/* Return zero if strings are the same, non-zero if they are not.
+ * The comparison is performed in a way that prevents an attacker to obtain
+ * information about the nature of the strings just monitoring the execution
+ * time of the function.
+ *
+ * Note that limiting the comparison length to strings up to 512 bytes we
+ * can avoid leaking any information about the password length and any
+ * possible branch misprediction related leak.
+ */
+int time_independent_strcmp(char *a, char *b) {
+ char bufa[REDIS_AUTHPASS_MAX_LEN], bufb[REDIS_AUTHPASS_MAX_LEN];
+ /* The above two strlen perform len(a) + len(b) operations where either
+ * a or b are fixed (our password) length, and the difference is only
+ * relative to the length of the user provided string, so no information
+ * leak is possible in the following two lines of code. */
+ int alen = strlen(a);
+ int blen = strlen(b);
+ int j;
+ int diff = 0;
+
+ /* We can't compare strings longer than our static buffers.
+ * Note that this will never pass the first test in practical circumstances
+ * so there is no info leak. */
+ if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1;
+
+ memset(bufa,0,sizeof(bufa)); /* Constant time. */
+ memset(bufb,0,sizeof(bufb)); /* Constant time. */
+ /* Again the time of the following two copies is proportional to
+ * len(a) + len(b) so no info is leaked. */
+ memcpy(bufa,a,alen);
+ memcpy(bufb,b,blen);
+
+ /* Always compare all the chars in the two buffers without
+ * conditional expressions. */
+ for (j = 0; j < sizeof(bufa); j++) {
+ diff |= (bufa[j] ^ bufb[j]);
+ }
+ /* Length must be equal as well. */
+ diff |= alen ^ blen;
+ return diff; /* If zero strings are the same. */
+}
+
void authCommand(redisClient *c) {
if (!server.requirepass) {
addReplyError(c,"Client sent AUTH, but no password is set");
- } else if (!strcmp(c->argv[1]->ptr, server.requirepass)) {
+ } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
c->authenticated = 1;
addReply(c,shared.ok);
} else {
"aof_rewrite_in_progress:%d\r\n"
"aof_rewrite_scheduled:%d\r\n"
"aof_last_rewrite_time_sec:%ld\r\n"
- "aof_current_rewrite_time_sec:%ld\r\n",
+ "aof_current_rewrite_time_sec:%ld\r\n"
+ "aof_last_bgrewrite_status:%s\r\n",
server.loading,
server.dirty,
server.rdb_child_pid != -1,
server.lastsave,
- server.lastbgsave_status == REDIS_OK ? "ok" : "err",
+ (server.lastbgsave_status == REDIS_OK) ? "ok" : "err",
server.rdb_save_time_last,
(server.rdb_child_pid == -1) ?
-1 : time(NULL)-server.rdb_save_time_start,
server.aof_rewrite_scheduled,
server.aof_rewrite_time_last,
(server.aof_child_pid == -1) ?
- -1 : time(NULL)-server.aof_rewrite_time_start);
+ -1 : time(NULL)-server.aof_rewrite_time_start,
+ (server.aof_lastbgrewrite_status == REDIS_OK) ? "ok" : "err");
if (server.aof_state != REDIS_AOF_OFF) {
info = sdscatprintf(info,
"aof_base_size:%lld\r\n"
"aof_pending_rewrite:%d\r\n"
"aof_buffer_length:%zu\r\n"
- "aof_rewrite_buffer_length:%zu\r\n"
+ "aof_rewrite_buffer_length:%lu\r\n"
"aof_pending_bio_fsync:%llu\r\n"
"aof_delayed_fsync:%lu\r\n",
(long long) server.aof_current_size,
if (server.repl_state == REDIS_REPL_TRANSFER) {
info = sdscatprintf(info,
- "master_sync_left_bytes:%ld\r\n"
+ "master_sync_left_bytes:%lld\r\n"
"master_sync_last_io_seconds_ago:%d\r\n"
- ,(long)server.repl_transfer_left,
+ , (long long)
+ (server.repl_transfer_size - server.repl_transfer_read),
(int)(server.unixtime-server.repl_transfer_lastio)
);
}
"master_link_down_since_seconds:%ld\r\n",
(long)server.unixtime-server.repl_down_since);
}
+ info = sdscatprintf(info,
+ "slave_priority:%d\r\n", server.slave_priority);
}
info = sdscatprintf(info,
"connected_slaves:%lu\r\n",
}
if (state == NULL) continue;
info = sdscatprintf(info,"slave%d:%s,%d,%s\r\n",
- slaveid,ip,port,state);
+ slaveid,ip,slave->slave_listening_port,state);
slaveid++;
}
}
fprintf(stderr," ./redis-server /etc/redis/6379.conf\n");
fprintf(stderr," ./redis-server --port 7777\n");
fprintf(stderr," ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
- fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n");
+ fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
+ fprintf(stderr,"Sentinel mode:\n");
+ fprintf(stderr," ./redis-server /etc/sentinel.conf --sentinel\n");
exit(1);
}
void redisAsciiArt(void) {
#include "asciilogo.h"
char *buf = zmalloc(1024*16);
+ char *mode = "stand alone";
+
+ if (server.cluster_enabled) mode = "cluster";
+ else if (server.sentinel_mode) mode = "sentinel";
snprintf(buf,1024*16,ascii_logo,
REDIS_VERSION,
redisGitSHA1(),
strtol(redisGitDirty(),NULL,10) > 0,
(sizeof(long) == 8) ? "64" : "32",
+<<<<<<< HEAD
"stand alone",
server.port,
+=======
+ mode, server.port,
+>>>>>>> 6b5daa2... First implementation of Redis Sentinel.
(long) getpid()
);
redisLogRaw(REDIS_NOTICE|REDIS_LOG_RAW,buf);
void memtest(size_t megabytes, int passes);
+<<<<<<< HEAD
+void redisOutOfMemoryHandler(size_t allocation_size) {
+ redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!",
+ allocation_size);
+ redisPanic("OOM");
+=======
+/* Returns 1 if there is --sentinel among the arguments or if
+ * argv[0] is exactly "redis-sentinel". */
+int checkForSentinelMode(int argc, char **argv) {
+ int j;
+
+ if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
+ for (j = 1; j < argc; j++)
+ if (!strcmp(argv[j],"--sentinel")) return 1;
+ return 0;
+}
+
+/* Function called at startup to load RDB or AOF file in memory. */
+void loadDataFromDisk(void) {
+ long long start = ustime();
+ if (server.aof_state == REDIS_AOF_ON) {
+ if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
+ redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
+ } else {
+ if (rdbLoad(server.rdb_filename) == REDIS_OK) {
+ redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
+ (float)(ustime()-start)/1000000);
+ } else if (errno != ENOENT) {
+ redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
+ exit(1);
+ }
+ }
+>>>>>>> 6b5daa2... First implementation of Redis Sentinel.
+}
+
int main(int argc, char **argv) {
- long long start;
struct timeval tv;
/* We need to initialize our libraries, and the server configuration. */
zmalloc_enable_thread_safeness();
+ zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
+ server.sentinel_mode = checkForSentinelMode(argc,argv);
initServerConfig();
+ /* We need to init sentinel right now as parsing the configuration file
+ * in sentinel mode will have the effect of populating the sentinel
+ * data structures with master nodes to monitor. */
+ if (server.sentinel_mode) {
+ initSentinelConfig();
+ initSentinel();
+ }
+
if (argc >= 2) {
int j = 1; /* First option to parse in argv[] */
sds options = sdsempty();
initServer();
if (server.daemonize) createPidFile();
redisAsciiArt();
- redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
-#ifdef __linux__
- linuxOvercommitMemoryWarning();
-#endif
- start = ustime();
- if (server.aof_state == REDIS_AOF_ON) {
- if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
- redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
- } else {
- if (rdbLoad(server.rdb_filename) == REDIS_OK) {
- redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
- (float)(ustime()-start)/1000000);
- } else if (errno != ENOENT) {
- redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
- exit(1);
- }
+
+ if (!server.sentinel_mode) {
+ /* Things only needed when not runnign in Sentinel mode. */
+ redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
+ #ifdef __linux__
+ linuxOvercommitMemoryWarning();
+ #endif
+ loadDataFromDisk();
+ if (server.ipfd > 0)
+ redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+ if (server.sofd > 0)
+ redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
}
- if (server.ipfd > 0)
- redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
- if (server.sofd > 0)
- redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
+
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);