]> git.saurik.com Git - redis.git/blobdiff - src/redis.c
New INFO field aof_delayed_fsync introduced.
[redis.git] / src / redis.c
index 01ec0531ef15711d512b43a08e224d06a9f9b59d..39de86af6415ffffdc86c8fa61bfa2dd570df8e5 100644 (file)
@@ -211,7 +211,7 @@ struct redisCommand redisCommandTable[] = {
     {"lastsave",lastsaveCommand,1,"r",0,NULL,0,0,0,0,0},
     {"type",typeCommand,2,"r",0,NULL,1,1,1,0,0},
     {"multi",multiCommand,1,"rs",0,NULL,0,0,0,0,0},
-    {"exec",execCommand,1,"wms",0,NULL,0,0,0,0,0},
+    {"exec",execCommand,1,"s",0,NULL,0,0,0,0,0},
     {"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
     {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
     {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
@@ -223,24 +223,22 @@ struct redisCommand redisCommandTable[] = {
     {"pttl",pttlCommand,2,"r",0,NULL,1,1,1,0,0},
     {"persist",persistCommand,2,"w",0,NULL,1,1,1,0,0},
     {"slaveof",slaveofCommand,3,"aws",0,NULL,0,0,0,0,0},
-    {"debug",debugCommand,-2,"aws",0,NULL,0,0,0,0,0},
+    {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
     {"config",configCommand,-2,"ar",0,NULL,0,0,0,0,0},
     {"subscribe",subscribeCommand,-2,"rps",0,NULL,0,0,0,0,0},
     {"unsubscribe",unsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0},
     {"psubscribe",psubscribeCommand,-2,"rps",0,NULL,0,0,0,0,0},
     {"punsubscribe",punsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0},
-    {"publish",publishCommand,3,"rpf",0,NULL,0,0,0,0,0},
+    {"publish",publishCommand,3,"pf",0,NULL,0,0,0,0,0},
     {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
     {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
-    {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
     {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
     {"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0},
-    {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0},
     {"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0},
     {"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0},
     {"client",clientCommand,-2,"ar",0,NULL,0,0,0,0,0},
-    {"eval",evalCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
-    {"evalsha",evalShaCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
+    {"eval",evalCommand,-3,"s",0,zunionInterGetKeys,0,0,0,0,0},
+    {"evalsha",evalShaCommand,-3,"s",0,zunionInterGetKeys,0,0,0,0,0},
     {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
     {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
     {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}
@@ -507,17 +505,6 @@ dictType keylistDictType = {
     dictListDestructor          /* val destructor */
 };
 
-/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
- * clusterNode structures. */
-dictType clusterNodesDictType = {
-    dictSdsHash,                /* hash function */
-    NULL,                       /* key dup */
-    NULL,                       /* val dup */
-    dictSdsKeyCompare,          /* key compare */
-    dictSdsDestructor,          /* key destructor */
-    NULL                        /* val destructor */
-};
-
 int htNeedsResize(dict *dict) {
     long long size, used;
 
@@ -616,6 +603,110 @@ void updateLRUClock(void) {
                                                 REDIS_LRU_CLOCK_MAX;
 }
 
+
+/* Add a sample to the operations per second array of samples. */
+void trackOperationsPerSecond(void) {
+    long long t = mstime() - server.ops_sec_last_sample_time;
+    long long ops = server.stat_numcommands - server.ops_sec_last_sample_ops;
+    long long ops_sec;
+
+    ops_sec = t > 0 ? (ops*1000/t) : 0;
+
+    server.ops_sec_samples[server.ops_sec_idx] = ops_sec;
+    server.ops_sec_idx = (server.ops_sec_idx+1) % REDIS_OPS_SEC_SAMPLES;
+    server.ops_sec_last_sample_time = mstime();
+    server.ops_sec_last_sample_ops = server.stat_numcommands;
+}
+
+/* Return the mean of all the samples. */
+long long getOperationsPerSecond(void) {
+    int j;
+    long long sum = 0;
+
+    for (j = 0; j < REDIS_OPS_SEC_SAMPLES; j++)
+        sum += server.ops_sec_samples[j];
+    return sum / REDIS_OPS_SEC_SAMPLES;
+}
+
+/* Check for timeouts. Returns non-zero if the client was terminated */
+int clientsCronHandleTimeout(redisClient *c) {
+    time_t now = server.unixtime;
+
+    if (server.maxidletime &&
+        !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
+        !(c->flags & REDIS_MASTER) &&   /* no timeout for masters */
+        !(c->flags & REDIS_BLOCKED) &&  /* no timeout for BLPOP */
+        dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
+        listLength(c->pubsub_patterns) == 0 &&
+        (now - c->lastinteraction > server.maxidletime))
+    {
+        redisLog(REDIS_VERBOSE,"Closing idle client");
+        freeClient(c);
+        return 1;
+    } else if (c->flags & REDIS_BLOCKED) {
+        if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
+            addReply(c,shared.nullmultibulk);
+            unblockClientWaitingData(c);
+        }
+    }
+    return 0;
+}
+
+/* The client query buffer is an sds.c string that can end with a lot of
+ * free space not used, this function reclaims space if needed.
+ *
+ * The funciton always returns 0 as it never terminates the client. */
+int clientsCronResizeQueryBuffer(redisClient *c) {
+    size_t querybuf_size = sdsAllocSize(c->querybuf);
+    time_t idletime = server.unixtime - c->lastinteraction;
+
+    /* There are two conditions to resize the query buffer:
+     * 1) Query buffer is > BIG_ARG and too big for latest peak.
+     * 2) Client is inactive and the buffer is bigger than 1k. */
+    if (((querybuf_size > REDIS_MBULK_BIG_ARG) &&
+         (querybuf_size/(c->querybuf_peak+1)) > 2) ||
+         (querybuf_size > 1024 && idletime > 2))
+    {
+        /* Only resize the query buffer if it is actually wasting space. */
+        if (sdsavail(c->querybuf) > 1024) {
+            c->querybuf = sdsRemoveFreeSpace(c->querybuf);
+        }
+    }
+    /* Reset the peak again to capture the peak memory usage in the next
+     * cycle. */
+    c->querybuf_peak = 0;
+    return 0;
+}
+
+void clientsCron(void) {
+    /* Make sure to process at least 1/100 of clients per call.
+     * Since this function is called 10 times per second we are sure that
+     * in the worst case we process all the clients in 10 seconds.
+     * In normal conditions (a reasonable number of clients) we process
+     * all the clients in a shorter time. */
+    int numclients = listLength(server.clients);
+    int iterations = numclients/100;
+
+    if (iterations < 50)
+        iterations = (numclients < 50) ? numclients : 50;
+    while(listLength(server.clients) && iterations--) {
+        redisClient *c;
+        listNode *head;
+
+        /* Rotate the list, take the current head, process.
+         * This way if the client must be removed from the list it's the
+         * first element and we don't incur into O(N) computation. */
+        listRotate(server.clients);
+        head = listFirst(server.clients);
+        c = listNodeValue(head);
+        /* The following functions do different service checks on the client.
+         * The protocol is that they return non-zero if the client was
+         * terminated. */
+        if (clientsCronHandleTimeout(c)) continue;
+        if (clientsCronResizeQueryBuffer(c)) continue;
+    }
+}
+
 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     int j, loops = server.cronloops;
     REDIS_NOTUSED(eventLoop);
@@ -628,6 +719,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * To access a global var is faster than calling time(NULL) */
     server.unixtime = time(NULL);
 
+    trackOperationsPerSecond();
+
     /* We have just 22 bits per object for LRU information.
      * So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
      * 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
@@ -685,9 +778,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
             zmalloc_used_memory());
     }
 
-    /* Close connections of timedout clients */
-    if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
-        closeTimedoutClients();
+    /* We need to do a few operations on clients asynchronously. */
+    clientsCron();
 
     /* Start a scheduled AOF rewrite if this was requested by the user while
      * a BGSAVE was in progress. */
@@ -765,9 +857,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * to detect transfer failures. */
     if (!(loops % 10)) replicationCron();
 
-    /* Run other sub-systems specific cron jobs */
-    if (server.cluster_enabled && !(loops % 10)) clusterCron();
-
     server.cronloops++;
     return 100;
 }
@@ -834,7 +923,11 @@ void createSharedObjects(void) {
     shared.slowscripterr = createObject(REDIS_STRING,sdsnew(
         "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
     shared.bgsaveerr = createObject(REDIS_STRING,sdsnew(
-        "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Write commands are disabled. Please check Redis logs for details about the error.\r\n"));
+        "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n"));
+    shared.roslaveerr = createObject(REDIS_STRING,sdsnew(
+        "-READONLY You can't write against a read only slave.\r\n"));
+    shared.oomerr = createObject(REDIS_STRING,sdsnew(
+        "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
     shared.space = createObject(REDIS_STRING,sdsnew(" "));
     shared.colon = createObject(REDIS_STRING,sdsnew(":"));
     shared.plus = createObject(REDIS_STRING,sdsnew("+"));
@@ -870,6 +963,8 @@ void createSharedObjects(void) {
 }
 
 void initServerConfig() {
+    getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
+    server.runid[REDIS_RUN_ID_SIZE] = '\0';
     server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
     server.port = REDIS_SERVERPORT;
     server.bindaddr = NULL;
@@ -896,6 +991,7 @@ void initServerConfig() {
     server.aof_rewrite_base_size = 0;
     server.aof_rewrite_scheduled = 0;
     server.aof_last_fsync = time(NULL);
+    server.aof_delayed_fsync = 0;
     server.aof_fd = -1;
     server.aof_selected_db = -1; /* Make sure the first time will not match */
     server.aof_flush_postponed_start = 0;
@@ -910,8 +1006,8 @@ void initServerConfig() {
     server.maxmemory = 0;
     server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
     server.maxmemory_samples = 3;
-    server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
-    server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
+    server.hash_max_ziplist_entries = REDIS_HASH_MAX_ZIPLIST_ENTRIES;
+    server.hash_max_ziplist_value = REDIS_HASH_MAX_ZIPLIST_VALUE;
     server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
     server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
     server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
@@ -920,8 +1016,6 @@ void initServerConfig() {
     server.shutdown_asap = 0;
     server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
     server.repl_timeout = REDIS_REPL_TIMEOUT;
-    server.cluster_enabled = 0;
-    server.cluster.configfile = zstrdup("nodes.conf");
     server.lua_caller = NULL;
     server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
     server.lua_client = NULL;
@@ -941,6 +1035,7 @@ void initServerConfig() {
     server.repl_state = REDIS_REPL_NONE;
     server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
     server.repl_serve_stale_data = 1;
+    server.repl_slave_ro = 1;
     server.repl_down_since = -1;
 
     /* Client output buffer limits */
@@ -1089,6 +1184,10 @@ void initServer() {
     server.stat_peak_memory = 0;
     server.stat_fork_time = 0;
     server.stat_rejected_conn = 0;
+    memset(server.ops_sec_samples,0,sizeof(server.ops_sec_samples));
+    server.ops_sec_idx = 0;
+    server.ops_sec_last_sample_time = mstime();
+    server.ops_sec_last_sample_ops = 0;
     server.unixtime = time(NULL);
     server.lastbgsave_status = REDIS_OK;
     server.stop_writes_on_bgsave_err = 1;
@@ -1118,7 +1217,6 @@ void initServer() {
         server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
     }
 
-    if (server.cluster_enabled) clusterInit();
     scriptingInit();
     slowlogInit();
     bioInit();
@@ -1341,29 +1439,6 @@ int processCommand(redisClient *c) {
         return REDIS_OK;
     }
 
-    /* If cluster is enabled, redirect here */
-    if (server.cluster_enabled &&
-                !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) {
-        int hashslot;
-
-        if (server.cluster.state != REDIS_CLUSTER_OK) {
-            addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information");
-            return REDIS_OK;
-        } else {
-            int ask;
-            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&ask);
-            if (n == NULL) {
-                addReplyError(c,"Multi keys request invalid in cluster");
-                return REDIS_OK;
-            } else if (n != server.cluster.myself) {
-                addReplySds(c,sdscatprintf(sdsempty(),
-                    "-%s %d %s:%d\r\n", ask ? "ASK" : "MOVED",
-                    hashslot,n->ip,n->port));
-                return REDIS_OK;
-            }
-        }
-    }
-
     /* Handle the maxmemory directive.
      *
      * First we try to free some memory if possible (if there are volatile
@@ -1372,8 +1447,7 @@ int processCommand(redisClient *c) {
     if (server.maxmemory) {
         int retval = freeMemoryIfNeeded();
         if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
-            addReplyError(c,
-                "command not allowed when used memory > 'maxmemory'");
+            addReply(c, shared.oomerr);
             return REDIS_OK;
         }
     }
@@ -1388,6 +1462,16 @@ int processCommand(redisClient *c) {
         return REDIS_OK;
     }
 
+    /* Don't accept wirte commands if this is a read only slave. But
+     * accept write commands if this is our master. */
+    if (server.masterhost && server.repl_slave_ro &&
+        !(c->flags & REDIS_MASTER) &&
+        c->cmd->flags & REDIS_CMD_WRITE)
+    {
+        addReply(c, shared.roslaveerr);
+        return REDIS_OK;
+    }
+
     /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
     if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
         &&
@@ -1585,6 +1669,7 @@ sds genRedisInfoString(char *section) {
             "multiplexing_api:%s\r\n"
             "gcc_version:%d.%d.%d\r\n"
             "process_id:%ld\r\n"
+            "run_id:%s\r\n"
             "tcp_port:%d\r\n"
             "uptime_in_seconds:%ld\r\n"
             "uptime_in_days:%ld\r\n"
@@ -1600,6 +1685,7 @@ sds genRedisInfoString(char *section) {
             0,0,0,
 #endif
             (long) getpid(),
+            server.runid,
             server.port,
             uptime,
             uptime/(3600*24),
@@ -1675,12 +1761,14 @@ sds genRedisInfoString(char *section) {
                 "aof_base_size:%lld\r\n"
                 "aof_pending_rewrite:%d\r\n"
                 "aof_buffer_length:%zu\r\n"
-                "aof_pending_bio_fsync:%llu\r\n",
+                "aof_pending_bio_fsync:%llu\r\n"
+                "aof_delayed_fsync:%lu\r\n",
                 (long long) server.aof_current_size,
                 (long long) server.aof_rewrite_base_size,
                 server.aof_rewrite_scheduled,
                 sdslen(server.aof_buf),
-                bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC));
+                bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC),
+                server.aof_delayed_fsync);
         }
 
         if (server.loading) {
@@ -1722,6 +1810,7 @@ sds genRedisInfoString(char *section) {
             "# Stats\r\n"
             "total_connections_received:%lld\r\n"
             "total_commands_processed:%lld\r\n"
+            "instantaneous_ops_per_sec:%lld\r\n"
             "rejected_connections:%lld\r\n"
             "expired_keys:%lld\r\n"
             "evicted_keys:%lld\r\n"
@@ -1732,6 +1821,7 @@ sds genRedisInfoString(char *section) {
             "latest_fork_usec:%lld\r\n",
             server.stat_numconnections,
             server.stat_numcommands,
+            getOperationsPerSecond(),
             server.stat_rejected_conn,
             server.stat_expiredkeys,
             server.stat_evictedkeys,
@@ -1847,15 +1937,6 @@ sds genRedisInfoString(char *section) {
         }
     }
 
-    /* Clusetr */
-    if (allsections || defsections || !strcasecmp(section,"cluster")) {
-        if (sections++) info = sdscat(info,"\r\n");
-        info = sdscatprintf(info,
-        "# Cluster\r\n"
-        "cluster_enabled:%d\r\n",
-        server.cluster_enabled);
-    }
-
     /* Key space */
     if (allsections || defsections || !strcasecmp(section,"keyspace")) {
         if (sections++) info = sdscat(info,"\r\n");
@@ -2107,8 +2188,8 @@ void daemonize(void) {
 }
 
 void version() {
-    printf("Redis server version %s (%s:%d)\n", REDIS_VERSION,
-        redisGitSHA1(), atoi(redisGitDirty()) > 0);
+    printf("Redis server v=%s sha=%s:%d malloc=%s\n", REDIS_VERSION,
+        redisGitSHA1(), atoi(redisGitDirty()) > 0, ZMALLOC_LIB);
     exit(0);
 }
 
@@ -2116,7 +2197,8 @@ void usage() {
     fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n");
     fprintf(stderr,"       ./redis-server - (read config from stdin)\n");
     fprintf(stderr,"       ./redis-server -v or --version\n");
-    fprintf(stderr,"       ./redis-server -h or --help\n\n");
+    fprintf(stderr,"       ./redis-server -h or --help\n");
+    fprintf(stderr,"       ./redis-server --test-memory <megabytes>\n\n");
     fprintf(stderr,"Examples:\n");
     fprintf(stderr,"       ./redis-server (run the server with default conf)\n");
     fprintf(stderr,"       ./redis-server /etc/redis/6379.conf\n");
@@ -2135,7 +2217,7 @@ void redisAsciiArt(void) {
         redisGitSHA1(),
         strtol(redisGitDirty(),NULL,10) > 0,
         (sizeof(long) == 8) ? "64" : "32",
-        server.cluster_enabled ? "cluster" : "stand alone",
+        "stand alone",
         server.port,
         (long) getpid()
     );
@@ -2172,6 +2254,8 @@ void setupSignalHandlers(void) {
     return;
 }
 
+void memtest(size_t megabytes, int passes);
+
 int main(int argc, char **argv) {
     long long start;
     struct timeval tv;
@@ -2193,6 +2277,17 @@ int main(int argc, char **argv) {
             strcmp(argv[1], "--version") == 0) version();
         if (strcmp(argv[1], "--help") == 0 ||
             strcmp(argv[1], "-h") == 0) usage();
+        if (strcmp(argv[1], "--test-memory") == 0) {
+            if (argc == 3) {
+                memtest(atoi(argv[2]),50);
+                exit(0);
+            } else {
+                fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
+                fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
+                exit(1);
+            }
+        }
+
         /* First argument is the config file name? */
         if (argv[j][0] != '-' || argv[j][1] != '-')
             configfile = argv[j++];