/*
- * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* t: Allow command while a slave has stale data but is not allowed to
* server this data. Normally no command is accepted in this condition
* but just a few.
+ * M: Do not automatically propagate the command on MONITOR.
*/
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
{"sismember",sismemberCommand,3,"r",0,NULL,1,1,1,0,0},
{"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0},
{"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0},
- {"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0},
+ {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
{"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
{"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
{"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
{"lastsave",lastsaveCommand,1,"r",0,NULL,0,0,0,0,0},
{"type",typeCommand,2,"r",0,NULL,1,1,1,0,0},
{"multi",multiCommand,1,"rs",0,NULL,0,0,0,0,0},
- {"exec",execCommand,1,"s",0,NULL,0,0,0,0,0},
+ {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
{"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
{"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
{"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0},
return memcmp(key1, key2, l1) == 0;
}
-/* A case insensitive version used for the command lookup table. */
+/* A case insensitive version used for the command lookup table and other
+ * places where case insensitive non binary-safe comparison is needed. */
int dictSdsKeyCaseCompare(void *privdata, const void *key1,
const void *key2)
{
dictRedisObjectDestructor /* val destructor */
};
+/* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
+dictType shaScriptObjectDictType = {
+ dictSdsCaseHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCaseCompare, /* key compare */
+ dictSdsDestructor, /* key destructor */
+ dictRedisObjectDestructor /* val destructor */
+};
+
/* Db->expires */
dictType keyptrDictType = {
dictSdsHash, /* hash function */
* a macro is used: run_with_period(milliseconds) { .... }
*/
-/* Using the following macro you can run code inside serverCron() with the
- * specified period, specified in milliseconds.
- * The actual resolution depends on REDIS_HZ. */
-#define run_with_period(_ms_) if (!(loops % ((_ms_)/(1000/REDIS_HZ))))
-
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
- int j, loops = server.cronloops;
+ int j;
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);
}
/* Show information about connected clients */
- run_with_period(5000) {
- redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
- listLength(server.clients)-listLength(server.slaves),
- listLength(server.slaves),
- zmalloc_used_memory());
+ if (!server.sentinel_mode) {
+ run_with_period(5000) {
+ redisLog(REDIS_VERBOSE,
+ "%d clients connected (%d slaves), %zu bytes in use",
+ listLength(server.clients)-listLength(server.slaves),
+ listLength(server.slaves),
+ zmalloc_used_memory());
+ }
}
/* We need to do a few operations on clients asynchronously. */
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal);
- } else {
+ } else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
+ } else {
+ redisLog(REDIS_WARNING,
+ "Warning, detected child with unmatched pid: %ld",
+ (long)pid);
}
updateDictResizePolicy();
}
* to detect transfer failures. */
run_with_period(1000) replicationCron();
+ /* Run the sentinel timer if we are in sentinel mode. */
+ run_with_period(100) {
+ if (server.sentinel_mode) sentinelTimer();
+ }
+
server.cronloops++;
return 1000/REDIS_HZ;
}
"-READONLY You can't write against a read only slave.\r\n"));
shared.oomerr = createObject(REDIS_STRING,sdsnew(
"-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
+ shared.execaborterr = createObject(REDIS_STRING,sdsnew(
+ "-EXECABORT Transaction discarded because of previous errors.\r\n"));
shared.space = createObject(REDIS_STRING,sdsnew(" "));
shared.colon = createObject(REDIS_STRING,sdsnew(":"));
shared.plus = createObject(REDIS_STRING,sdsnew("+"));
shared.del = createStringObject("DEL",3);
shared.rpop = createStringObject("RPOP",4);
shared.lpop = createStringObject("LPOP",4);
+ shared.lpush = createStringObject("LPUSH",5);
for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
shared.integers[j]->encoding = REDIS_ENCODING_INT;
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
+ server.lpopCommand = lookupCommandByCString("lpop");
+ server.rpopCommand = lookupCommandByCString("rpop");
/* Slow log */
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
server.slaves = listCreate();
server.monitors = listCreate();
server.unblocked_clients = listCreate();
+ server.ready_keys = listCreate();
createSharedObjects();
adjustOpenFilesLimit();
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
+ server.db[j].ready_keys = dictCreate(&setDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
}
/* 32 bit instances are limited to 4GB of address space, so if there is
* no explicit limit in the user provided configuration we set a limit
- * at 3.5GB using maxmemory with 'noeviction' policy'. This saves
- * useless crashes of the Redis instance. */
+ * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
+ * useless crashes of the Redis instance for out of memory. */
if (server.arch_bits == 32 && server.maxmemory == 0) {
- redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with 'noeviction' policy now.");
- server.maxmemory = 3584LL*(1024*1024); /* 3584 MB = 3.5 GB */
+ redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
+ server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
}
case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
case 'l': c->flags |= REDIS_CMD_LOADING; break;
case 't': c->flags |= REDIS_CMD_STALE; break;
+ case 'M': c->flags |= REDIS_CMD_SKIP_MONITOR; break;
default: redisPanic("Unsupported command flag"); break;
}
f++;
}
/* Propagate the specified command (in the context of the specified database id)
- * to AOF, Slaves and Monitors.
+ * to AOF and Slaves.
*
* flags are an xor between:
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
/* Sent the command to clients in MONITOR mode, only if the commands are
* not geneated from reading an AOF. */
- if (listLength(server.monitors) && !server.loading)
+ if (listLength(server.monitors) &&
+ !server.loading &&
+ !(c->cmd->flags & REDIS_CMD_SKIP_MONITOR))
+ {
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
+ }
/* Call the command. */
redisOpArrayInit(&server.also_propagate);
}
/* If this function gets called we already read a whole
- * command, argments are in the client argv/argc fields.
+ * command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
+ flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return REDIS_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
+ flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
/* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
+ flagTransaction(c);
addReplyError(c,"operation not permitted");
return REDIS_OK;
}
if (server.maxmemory) {
int retval = freeMemoryIfNeeded();
if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
+ flagTransaction(c);
addReply(c, shared.oomerr);
return REDIS_OK;
}
&& server.lastbgsave_status == REDIS_ERR &&
c->cmd->flags & REDIS_CMD_WRITE)
{
+ flagTransaction(c);
addReply(c, shared.bgsaveerr);
return REDIS_OK;
}
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & REDIS_CMD_STALE))
{
+ flagTransaction(c);
addReply(c, shared.masterdownerr);
return REDIS_OK;
}
/* Lua script too slow? Only allow commands with REDIS_CMD_STALE flag. */
if (server.lua_timedout &&
+ c->cmd->proc != authCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
+ flagTransaction(c);
addReply(c, shared.slowscripterr);
return REDIS_OK;
}
addReply(c,shared.queued);
} else {
call(c,REDIS_CALL_FULL);
+ if (listLength(server.ready_keys))
+ handleClientsBlockedOnLists();
}
return REDIS_OK;
}
unsigned long lol, bib;
int allsections = 0, defsections = 0;
int sections = 0;
-
+
if (section) {
allsections = strcasecmp(section,"all") == 0;
defsections = strcasecmp(section,"default") == 0;
/* Server */
if (allsections || defsections || !strcasecmp(section,"server")) {
struct utsname name;
+ char *mode;
+ if (server.sentinel_mode) mode = "sentinel";
+ else mode = "standalone";
+
if (sections++) info = sdscat(info,"\r\n");
uname(&name);
info = sdscatprintf(info,
"redis_version:%s\r\n"
"redis_git_sha1:%s\r\n"
"redis_git_dirty:%d\r\n"
+ "redis_mode:%s\r\n"
"os:%s %s %s\r\n"
"arch_bits:%d\r\n"
"multiplexing_api:%s\r\n"
REDIS_VERSION,
redisGitSHA1(),
strtol(redisGitDirty(),NULL,10) > 0,
+ mode,
name.sysname, name.release, name.machine,
server.arch_bits,
aeGetApiName(),
(long)server.unixtime-server.repl_down_since);
}
info = sdscatprintf(info,
- "slave_priority:%d\r\n", server.slave_priority);
+ "slave_priority:%d\r\n"
+ "slave_read_only:%d\r\n",
+ server.slave_priority,
+ server.repl_slave_ro);
}
info = sdscatprintf(info,
"connected_slaves:%lu\r\n",
}
void monitorCommand(redisClient *c) {
- /* ignore MONITOR if aleady slave or in monitor mode */
+ /* ignore MONITOR if already slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
de = dictGetRandomKey(dict);
thiskey = dictGetKey(de);
- /* When policy is volatile-lru we need an additonal lookup
+ /* When policy is volatile-lru we need an additional lookup
* to locate the real key, as dict is set to db->expires. */
if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
de = dictFind(db->dict, thiskey);
fprintf(stderr," ./redis-server /etc/redis/6379.conf\n");
fprintf(stderr," ./redis-server --port 7777\n");
fprintf(stderr," ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
- fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n");
+ fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
+ fprintf(stderr,"Sentinel mode:\n");
+ fprintf(stderr," ./redis-server /etc/sentinel.conf --sentinel\n");
exit(1);
}
void redisAsciiArt(void) {
#include "asciilogo.h"
char *buf = zmalloc(1024*16);
+ char *mode = "stand alone";
+
+ if (server.sentinel_mode) mode = "sentinel";
snprintf(buf,1024*16,ascii_logo,
REDIS_VERSION,
redisGitSHA1(),
strtol(redisGitDirty(),NULL,10) > 0,
(sizeof(long) == 8) ? "64" : "32",
- "stand alone",
- server.port,
+ mode, server.port,
(long) getpid()
);
redisLogRaw(REDIS_NOTICE|REDIS_LOG_RAW,buf);
void memtest(size_t megabytes, int passes);
+/* Returns 1 if there is --sentinel among the arguments or if
+ * argv[0] is exactly "redis-sentinel". */
+int checkForSentinelMode(int argc, char **argv) {
+ int j;
+
+ if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
+ for (j = 1; j < argc; j++)
+ if (!strcmp(argv[j],"--sentinel")) return 1;
+ return 0;
+}
+
+/* Function called at startup to load RDB or AOF file in memory. */
+void loadDataFromDisk(void) {
+ long long start = ustime();
+ if (server.aof_state == REDIS_AOF_ON) {
+ if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
+ redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
+ } else {
+ if (rdbLoad(server.rdb_filename) == REDIS_OK) {
+ redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
+ (float)(ustime()-start)/1000000);
+ } else if (errno != ENOENT) {
+ redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
+ exit(1);
+ }
+ }
+}
+
void redisOutOfMemoryHandler(size_t allocation_size) {
redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!",
allocation_size);
}
int main(int argc, char **argv) {
- long long start;
struct timeval tv;
/* We need to initialize our libraries, and the server configuration. */
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
+ server.sentinel_mode = checkForSentinelMode(argc,argv);
initServerConfig();
+ /* We need to init sentinel right now as parsing the configuration file
+ * in sentinel mode will have the effect of populating the sentinel
+ * data structures with master nodes to monitor. */
+ if (server.sentinel_mode) {
+ initSentinelConfig();
+ initSentinel();
+ }
+
if (argc >= 2) {
int j = 1; /* First option to parse in argv[] */
sds options = sdsempty();
loadServerConfig(configfile,options);
sdsfree(options);
} else {
- redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
+ redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
}
if (server.daemonize) daemonize();
initServer();
if (server.daemonize) createPidFile();
redisAsciiArt();
- redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
-#ifdef __linux__
- linuxOvercommitMemoryWarning();
-#endif
- start = ustime();
- if (server.aof_state == REDIS_AOF_ON) {
- if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
- redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
- } else {
- if (rdbLoad(server.rdb_filename) == REDIS_OK) {
- redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
- (float)(ustime()-start)/1000000);
- } else if (errno != ENOENT) {
- redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
- exit(1);
- }
+
+ if (!server.sentinel_mode) {
+ /* Things only needed when not running in Sentinel mode. */
+ redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
+ #ifdef __linux__
+ linuxOvercommitMemoryWarning();
+ #endif
+ loadDataFromDisk();
+ if (server.ipfd > 0)
+ redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+ if (server.sofd > 0)
+ redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
}
- if (server.ipfd > 0)
- redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
- if (server.sofd > 0)
- redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
+
+ /* Warning the user about suspicious maxmemory setting. */
+ if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
+ redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
+ }
+
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);