]> git.saurik.com Git - redis.git/blobdiff - src/redis.c
Scripting: add helper functions redis.error_reply() and redis.status_reply().
[redis.git] / src / redis.c
index 1fad059a5bab18f42c5fb8a621043cc875b6c772..0656d7c294bfaace069c7f8947f40ecad30491dd 100644 (file)
@@ -152,7 +152,7 @@ struct redisCommand redisCommandTable[] = {
     {"sismember",sismemberCommand,3,"r",0,NULL,1,1,1,0,0},
     {"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0},
     {"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0},
     {"sismember",sismemberCommand,3,"r",0,NULL,1,1,1,0,0},
     {"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0},
     {"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0},
-    {"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0},
+    {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
     {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
     {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
     {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
     {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
     {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
     {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
@@ -797,13 +797,8 @@ void clientsCron(void) {
  * a macro is used: run_with_period(milliseconds) { .... }
  */
 
  * a macro is used: run_with_period(milliseconds) { .... }
  */
 
-/* Using the following macro you can run code inside serverCron() with the
- * specified period, specified in milliseconds.
- * The actual resolution depends on REDIS_HZ. */
-#define run_with_period(_ms_) if (!(loops % ((_ms_)/(1000/REDIS_HZ))))
-
 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
-    int j, loops = server.cronloops;
+    int j;
     REDIS_NOTUSED(eventLoop);
     REDIS_NOTUSED(id);
     REDIS_NOTUSED(clientData);
     REDIS_NOTUSED(eventLoop);
     REDIS_NOTUSED(id);
     REDIS_NOTUSED(clientData);
@@ -872,11 +867,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     }
 
     /* Show information about connected clients */
     }
 
     /* Show information about connected clients */
-    run_with_period(5000) {
-        redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
-            listLength(server.clients)-listLength(server.slaves),
-            listLength(server.slaves),
-            zmalloc_used_memory());
+    if (!server.sentinel_mode) {
+        run_with_period(5000) {
+            redisLog(REDIS_VERBOSE,
+                "%d clients connected (%d slaves), %zu bytes in use",
+                listLength(server.clients)-listLength(server.slaves),
+                listLength(server.slaves),
+                zmalloc_used_memory());
+        }
     }
 
     /* We need to do a few operations on clients asynchronously. */
     }
 
     /* We need to do a few operations on clients asynchronously. */
@@ -956,6 +954,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * to detect transfer failures. */
     run_with_period(1000) replicationCron();
 
      * to detect transfer failures. */
     run_with_period(1000) replicationCron();
 
+    /* Run the sentinel timer if we are in sentinel mode. */
+    run_with_period(100) {
+        if (server.sentinel_mode) sentinelTimer();
+    }
+
     server.cronloops++;
     return 1000/REDIS_HZ;
 }
     server.cronloops++;
     return 1000/REDIS_HZ;
 }
@@ -1046,6 +1049,7 @@ void createSharedObjects(void) {
     shared.del = createStringObject("DEL",3);
     shared.rpop = createStringObject("RPOP",4);
     shared.lpop = createStringObject("LPOP",4);
     shared.del = createStringObject("DEL",3);
     shared.rpop = createStringObject("RPOP",4);
     shared.lpop = createStringObject("LPOP",4);
+    shared.lpush = createStringObject("LPUSH",5);
     for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
         shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
         shared.integers[j]->encoding = REDIS_ENCODING_INT;
     for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
         shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
         shared.integers[j]->encoding = REDIS_ENCODING_INT;
@@ -1164,6 +1168,8 @@ void initServerConfig() {
     server.delCommand = lookupCommandByCString("del");
     server.multiCommand = lookupCommandByCString("multi");
     server.lpushCommand = lookupCommandByCString("lpush");
     server.delCommand = lookupCommandByCString("del");
     server.multiCommand = lookupCommandByCString("multi");
     server.lpushCommand = lookupCommandByCString("lpush");
+    server.lpopCommand = lookupCommandByCString("lpop");
+    server.rpopCommand = lookupCommandByCString("rpop");
     
     /* Slow log */
     server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
     
     /* Slow log */
     server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@ -1239,6 +1245,7 @@ void initServer() {
     server.slaves = listCreate();
     server.monitors = listCreate();
     server.unblocked_clients = listCreate();
     server.slaves = listCreate();
     server.monitors = listCreate();
     server.unblocked_clients = listCreate();
+    server.ready_keys = listCreate();
 
     createSharedObjects();
     adjustOpenFilesLimit();
 
     createSharedObjects();
     adjustOpenFilesLimit();
@@ -1269,6 +1276,7 @@ void initServer() {
         server.db[j].dict = dictCreate(&dbDictType,NULL);
         server.db[j].expires = dictCreate(&keyptrDictType,NULL);
         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
         server.db[j].dict = dictCreate(&dbDictType,NULL);
         server.db[j].expires = dictCreate(&keyptrDictType,NULL);
         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
+        server.db[j].ready_keys = dictCreate(&setDictType,NULL);
         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
         server.db[j].id = j;
     }
         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
         server.db[j].id = j;
     }
@@ -1635,6 +1643,8 @@ int processCommand(redisClient *c) {
         addReply(c,shared.queued);
     } else {
         call(c,REDIS_CALL_FULL);
         addReply(c,shared.queued);
     } else {
         call(c,REDIS_CALL_FULL);
+        if (listLength(server.ready_keys))
+            handleClientsBlockedOnLists();
     }
     return REDIS_OK;
 }
     }
     return REDIS_OK;
 }
@@ -1802,7 +1812,7 @@ sds genRedisInfoString(char *section) {
     unsigned long lol, bib;
     int allsections = 0, defsections = 0;
     int sections = 0;
     unsigned long lol, bib;
     int allsections = 0, defsections = 0;
     int sections = 0;
-    
+
     if (section) {
         allsections = strcasecmp(section,"all") == 0;
         defsections = strcasecmp(section,"default") == 0;
     if (section) {
         allsections = strcasecmp(section,"all") == 0;
         defsections = strcasecmp(section,"default") == 0;
@@ -1815,7 +1825,11 @@ sds genRedisInfoString(char *section) {
     /* Server */
     if (allsections || defsections || !strcasecmp(section,"server")) {
         struct utsname name;
     /* Server */
     if (allsections || defsections || !strcasecmp(section,"server")) {
         struct utsname name;
+        char *mode;
 
 
+        if (server.sentinel_mode) mode = "sentinel";
+        else mode = "standalone";
+    
         if (sections++) info = sdscat(info,"\r\n");
         uname(&name);
         info = sdscatprintf(info,
         if (sections++) info = sdscat(info,"\r\n");
         uname(&name);
         info = sdscatprintf(info,
@@ -1823,6 +1837,7 @@ sds genRedisInfoString(char *section) {
             "redis_version:%s\r\n"
             "redis_git_sha1:%s\r\n"
             "redis_git_dirty:%d\r\n"
             "redis_version:%s\r\n"
             "redis_git_sha1:%s\r\n"
             "redis_git_dirty:%d\r\n"
+            "redis_mode:%s\r\n"
             "os:%s %s %s\r\n"
             "arch_bits:%d\r\n"
             "multiplexing_api:%s\r\n"
             "os:%s %s %s\r\n"
             "arch_bits:%d\r\n"
             "multiplexing_api:%s\r\n"
@@ -1836,6 +1851,7 @@ sds genRedisInfoString(char *section) {
             REDIS_VERSION,
             redisGitSHA1(),
             strtol(redisGitDirty(),NULL,10) > 0,
             REDIS_VERSION,
             redisGitSHA1(),
             strtol(redisGitDirty(),NULL,10) > 0,
+            mode,
             name.sysname, name.release, name.machine,
             server.arch_bits,
             aeGetApiName(),
             name.sysname, name.release, name.machine,
             server.arch_bits,
             aeGetApiName(),
@@ -1935,7 +1951,7 @@ sds genRedisInfoString(char *section) {
                 "aof_base_size:%lld\r\n"
                 "aof_pending_rewrite:%d\r\n"
                 "aof_buffer_length:%zu\r\n"
                 "aof_base_size:%lld\r\n"
                 "aof_pending_rewrite:%d\r\n"
                 "aof_buffer_length:%zu\r\n"
-                "aof_rewrite_buffer_length:%zu\r\n"
+                "aof_rewrite_buffer_length:%lu\r\n"
                 "aof_pending_bio_fsync:%llu\r\n"
                 "aof_delayed_fsync:%lu\r\n",
                 (long long) server.aof_current_size,
                 "aof_pending_bio_fsync:%llu\r\n"
                 "aof_delayed_fsync:%lu\r\n",
                 (long long) server.aof_current_size,
@@ -2387,21 +2403,25 @@ void usage() {
     fprintf(stderr,"       ./redis-server /etc/redis/6379.conf\n");
     fprintf(stderr,"       ./redis-server --port 7777\n");
     fprintf(stderr,"       ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
     fprintf(stderr,"       ./redis-server /etc/redis/6379.conf\n");
     fprintf(stderr,"       ./redis-server --port 7777\n");
     fprintf(stderr,"       ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
-    fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n");
+    fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
+    fprintf(stderr,"Sentinel mode:\n");
+    fprintf(stderr,"       ./redis-server /etc/sentinel.conf --sentinel\n");
     exit(1);
 }
 
 void redisAsciiArt(void) {
 #include "asciilogo.h"
     char *buf = zmalloc(1024*16);
     exit(1);
 }
 
 void redisAsciiArt(void) {
 #include "asciilogo.h"
     char *buf = zmalloc(1024*16);
+    char *mode = "stand alone";
+
+    if (server.sentinel_mode) mode = "sentinel";
 
     snprintf(buf,1024*16,ascii_logo,
         REDIS_VERSION,
         redisGitSHA1(),
         strtol(redisGitDirty(),NULL,10) > 0,
         (sizeof(long) == 8) ? "64" : "32",
 
     snprintf(buf,1024*16,ascii_logo,
         REDIS_VERSION,
         redisGitSHA1(),
         strtol(redisGitDirty(),NULL,10) > 0,
         (sizeof(long) == 8) ? "64" : "32",
-        "stand alone",
-        server.port,
+        mode, server.port,
         (long) getpid()
     );
     redisLogRaw(REDIS_NOTICE|REDIS_LOG_RAW,buf);
         (long) getpid()
     );
     redisLogRaw(REDIS_NOTICE|REDIS_LOG_RAW,buf);
@@ -2439,6 +2459,34 @@ void setupSignalHandlers(void) {
 
 void memtest(size_t megabytes, int passes);
 
 
 void memtest(size_t megabytes, int passes);
 
+/* Returns 1 if there is --sentinel among the arguments or if
+ * argv[0] is exactly "redis-sentinel". */
+int checkForSentinelMode(int argc, char **argv) {
+    int j;
+
+    if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
+    for (j = 1; j < argc; j++)
+        if (!strcmp(argv[j],"--sentinel")) return 1;
+    return 0;
+}
+
+/* Function called at startup to load RDB or AOF file in memory. */
+void loadDataFromDisk(void) {
+    long long start = ustime();
+    if (server.aof_state == REDIS_AOF_ON) {
+        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
+            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
+    } else {
+        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
+            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
+                (float)(ustime()-start)/1000000);
+        } else if (errno != ENOENT) {
+            redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
+            exit(1);
+        }
+    }
+}
+
 void redisOutOfMemoryHandler(size_t allocation_size) {
     redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!",
         allocation_size);
 void redisOutOfMemoryHandler(size_t allocation_size) {
     redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!",
         allocation_size);
@@ -2446,7 +2494,6 @@ void redisOutOfMemoryHandler(size_t allocation_size) {
 }
 
 int main(int argc, char **argv) {
 }
 
 int main(int argc, char **argv) {
-    long long start;
     struct timeval tv;
 
     /* We need to initialize our libraries, and the server configuration. */
     struct timeval tv;
 
     /* We need to initialize our libraries, and the server configuration. */
@@ -2455,8 +2502,17 @@ int main(int argc, char **argv) {
     srand(time(NULL)^getpid());
     gettimeofday(&tv,NULL);
     dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
     srand(time(NULL)^getpid());
     gettimeofday(&tv,NULL);
     dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
+    server.sentinel_mode = checkForSentinelMode(argc,argv);
     initServerConfig();
 
     initServerConfig();
 
+    /* We need to init sentinel right now as parsing the configuration file
+     * in sentinel mode will have the effect of populating the sentinel
+     * data structures with master nodes to monitor. */
+    if (server.sentinel_mode) {
+        initSentinelConfig();
+        initSentinel();
+    }
+
     if (argc >= 2) {
         int j = 1; /* First option to parse in argv[] */
         sds options = sdsempty();
     if (argc >= 2) {
         int j = 1; /* First option to parse in argv[] */
         sds options = sdsempty();
@@ -2502,33 +2558,26 @@ int main(int argc, char **argv) {
         loadServerConfig(configfile,options);
         sdsfree(options);
     } else {
         loadServerConfig(configfile,options);
         sdsfree(options);
     } else {
-        redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
+        redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
     }
     if (server.daemonize) daemonize();
     initServer();
     if (server.daemonize) createPidFile();
     redisAsciiArt();
     }
     if (server.daemonize) daemonize();
     initServer();
     if (server.daemonize) createPidFile();
     redisAsciiArt();
-    redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
-#ifdef __linux__
-    linuxOvercommitMemoryWarning();
-#endif
-    start = ustime();
-    if (server.aof_state == REDIS_AOF_ON) {
-        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
-            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
-    } else {
-        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
-            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
-                (float)(ustime()-start)/1000000);
-        } else if (errno != ENOENT) {
-            redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
-            exit(1);
-        }
+
+    if (!server.sentinel_mode) {
+        /* Things only needed when not runnign in Sentinel mode. */
+        redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
+    #ifdef __linux__
+        linuxOvercommitMemoryWarning();
+    #endif
+        loadDataFromDisk();
+        if (server.ipfd > 0)
+            redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+        if (server.sofd > 0)
+            redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
     }
     }
-    if (server.ipfd > 0)
-        redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
-    if (server.sofd > 0)
-        redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
+
     aeSetBeforeSleepProc(server.el,beforeSleep);
     aeMain(server.el);
     aeDeleteEventLoop(server.el);
     aeSetBeforeSleepProc(server.el,beforeSleep);
     aeMain(server.el);
     aeDeleteEventLoop(server.el);