]> git.saurik.com Git - redis.git/blobdiff - src/redis.c
Number of iteration of --test-memory is now 300 (several minutes per gigabyte). Memte...
[redis.git] / src / redis.c
index 64df9073bc52912e3c5a69c254ea8f2f66438eef..4e0042960c01100c3daa4c64d4eb1574029c296e 100644 (file)
@@ -232,17 +232,16 @@ struct redisCommand redisCommandTable[] = {
     {"publish",publishCommand,3,"rpf",0,NULL,0,0,0,0,0},
     {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
     {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
-    {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
     {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
     {"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0},
-    {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0},
     {"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0},
     {"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0},
     {"client",clientCommand,-2,"ar",0,NULL,0,0,0,0,0},
     {"eval",evalCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
     {"evalsha",evalShaCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
     {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
-    {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0}
+    {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
+    {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}
 };
 
 /*============================ Utility functions ============================ */
@@ -506,17 +505,6 @@ dictType keylistDictType = {
     dictListDestructor          /* val destructor */
 };
 
-/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
- * clusterNode structures. */
-dictType clusterNodesDictType = {
-    dictSdsHash,                /* hash function */
-    NULL,                       /* key dup */
-    NULL,                       /* val dup */
-    dictSdsKeyCompare,          /* key compare */
-    dictSdsDestructor,          /* key destructor */
-    NULL                        /* val destructor */
-};
-
 int htNeedsResize(dict *dict) {
     long long size, used;
 
@@ -615,6 +603,110 @@ void updateLRUClock(void) {
                                                 REDIS_LRU_CLOCK_MAX;
 }
 
+
+/* Add a sample to the operations per second array of samples. */
+void trackOperationsPerSecond(void) {
+    long long t = mstime() - server.ops_sec_last_sample_time;
+    long long ops = server.stat_numcommands - server.ops_sec_last_sample_ops;
+    long long ops_sec;
+
+    ops_sec = t > 0 ? (ops*1000/t) : 0;
+
+    server.ops_sec_samples[server.ops_sec_idx] = ops_sec;
+    server.ops_sec_idx = (server.ops_sec_idx+1) % REDIS_OPS_SEC_SAMPLES;
+    server.ops_sec_last_sample_time = mstime();
+    server.ops_sec_last_sample_ops = server.stat_numcommands;
+}
+
+/* Return the mean of all the samples. */
+long long getOperationsPerSecond(void) {
+    int j;
+    long long sum = 0;
+
+    for (j = 0; j < REDIS_OPS_SEC_SAMPLES; j++)
+        sum += server.ops_sec_samples[j];
+    return sum / REDIS_OPS_SEC_SAMPLES;
+}
+
+/* Check for timeouts. Returns non-zero if the client was terminated */
+int clientsCronHandleTimeout(redisClient *c) {
+    time_t now = server.unixtime;
+
+    if (server.maxidletime &&
+        !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
+        !(c->flags & REDIS_MASTER) &&   /* no timeout for masters */
+        !(c->flags & REDIS_BLOCKED) &&  /* no timeout for BLPOP */
+        dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
+        listLength(c->pubsub_patterns) == 0 &&
+        (now - c->lastinteraction > server.maxidletime))
+    {
+        redisLog(REDIS_VERBOSE,"Closing idle client");
+        freeClient(c);
+        return 1;
+    } else if (c->flags & REDIS_BLOCKED) {
+        if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
+            addReply(c,shared.nullmultibulk);
+            unblockClientWaitingData(c);
+        }
+    }
+    return 0;
+}
+
+/* The client query buffer is an sds.c string that can end with a lot of
+ * free space not used, this function reclaims space if needed.
+ *
+ * The funciton always returns 0 as it never terminates the client. */
+int clientsCronResizeQueryBuffer(redisClient *c) {
+    size_t querybuf_size = sdsAllocSize(c->querybuf);
+    time_t idletime = server.unixtime - c->lastinteraction;
+
+    /* There are two conditions to resize the query buffer:
+     * 1) Query buffer is > BIG_ARG and too big for latest peak.
+     * 2) Client is inactive and the buffer is bigger than 1k. */
+    if (((querybuf_size > REDIS_MBULK_BIG_ARG) &&
+         (querybuf_size/(c->querybuf_peak+1)) > 2) ||
+         (querybuf_size > 1024 && idletime > 2))
+    {
+        /* Only resize the query buffer if it is actually wasting space. */
+        if (sdsavail(c->querybuf) > 1024) {
+            c->querybuf = sdsRemoveFreeSpace(c->querybuf);
+        }
+    }
+    /* Reset the peak again to capture the peak memory usage in the next
+     * cycle. */
+    c->querybuf_peak = 0;
+    return 0;
+}
+
+void clientsCron(void) {
+    /* Make sure to process at least 1/100 of clients per call.
+     * Since this function is called 10 times per second we are sure that
+     * in the worst case we process all the clients in 10 seconds.
+     * In normal conditions (a reasonable number of clients) we process
+     * all the clients in a shorter time. */
+    int numclients = listLength(server.clients);
+    int iterations = numclients/100;
+
+    if (iterations < 50)
+        iterations = (numclients < 50) ? numclients : 50;
+    while(listLength(server.clients) && iterations--) {
+        redisClient *c;
+        listNode *head;
+
+        /* Rotate the list, take the current head, process.
+         * This way if the client must be removed from the list it's the
+         * first element and we don't incur into O(N) computation. */
+        listRotate(server.clients);
+        head = listFirst(server.clients);
+        c = listNodeValue(head);
+        /* The following functions do different service checks on the client.
+         * The protocol is that they return non-zero if the client was
+         * terminated. */
+        if (clientsCronHandleTimeout(c)) continue;
+        if (clientsCronResizeQueryBuffer(c)) continue;
+    }
+}
+
 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     int j, loops = server.cronloops;
     REDIS_NOTUSED(eventLoop);
@@ -627,6 +719,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * To access a global var is faster than calling time(NULL) */
     server.unixtime = time(NULL);
 
+    trackOperationsPerSecond();
+
     /* We have just 22 bits per object for LRU information.
      * So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
      * 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
@@ -684,9 +778,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
             zmalloc_used_memory());
     }
 
-    /* Close connections of timedout clients */
-    if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
-        closeTimedoutClients();
+    /* We need to do a few operations on clients asynchronously. */
+    clientsCron();
 
     /* Start a scheduled AOF rewrite if this was requested by the user while
      * a BGSAVE was in progress. */
@@ -764,9 +857,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * to detect transfer failures. */
     if (!(loops % 10)) replicationCron();
 
-    /* Run other sub-systems specific cron jobs */
-    if (server.cluster_enabled && !(loops % 10)) clusterCron();
-
     server.cronloops++;
     return 100;
 }
@@ -832,6 +922,8 @@ void createSharedObjects(void) {
         "-LOADING Redis is loading the dataset in memory\r\n"));
     shared.slowscripterr = createObject(REDIS_STRING,sdsnew(
         "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
+    shared.bgsaveerr = createObject(REDIS_STRING,sdsnew(
+        "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Write commands are disabled. Please check Redis logs for details about the error.\r\n"));
     shared.space = createObject(REDIS_STRING,sdsnew(" "));
     shared.colon = createObject(REDIS_STRING,sdsnew(":"));
     shared.plus = createObject(REDIS_STRING,sdsnew("+"));
@@ -867,6 +959,8 @@ void createSharedObjects(void) {
 }
 
 void initServerConfig() {
+    getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
+    server.runid[REDIS_RUN_ID_SIZE] = '\0';
     server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
     server.port = REDIS_SERVERPORT;
     server.bindaddr = NULL;
@@ -907,8 +1001,8 @@ void initServerConfig() {
     server.maxmemory = 0;
     server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
     server.maxmemory_samples = 3;
-    server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
-    server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
+    server.hash_max_ziplist_entries = REDIS_HASH_MAX_ZIPLIST_ENTRIES;
+    server.hash_max_ziplist_value = REDIS_HASH_MAX_ZIPLIST_VALUE;
     server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
     server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
     server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
@@ -917,8 +1011,6 @@ void initServerConfig() {
     server.shutdown_asap = 0;
     server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
     server.repl_timeout = REDIS_REPL_TIMEOUT;
-    server.cluster_enabled = 0;
-    server.cluster.configfile = zstrdup("nodes.conf");
     server.lua_caller = NULL;
     server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
     server.lua_client = NULL;
@@ -1086,7 +1178,13 @@ void initServer() {
     server.stat_peak_memory = 0;
     server.stat_fork_time = 0;
     server.stat_rejected_conn = 0;
+    memset(server.ops_sec_samples,0,sizeof(server.ops_sec_samples));
+    server.ops_sec_idx = 0;
+    server.ops_sec_last_sample_time = mstime();
+    server.ops_sec_last_sample_ops = 0;
     server.unixtime = time(NULL);
+    server.lastbgsave_status = REDIS_OK;
+    server.stop_writes_on_bgsave_err = 1;
     aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
     if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
         acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
@@ -1113,7 +1211,6 @@ void initServer() {
         server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
     }
 
-    if (server.cluster_enabled) clusterInit();
     scriptingInit();
     slowlogInit();
     bioInit();
@@ -1247,7 +1344,7 @@ void call(redisClient *c, int flags) {
     /* Sent the command to clients in MONITOR mode, only if the commands are
      * not geneated from reading an AOF. */
     if (listLength(server.monitors) && !server.loading)
-        replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
+        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
 
     /* Call the command. */
     redisOpArrayInit(&server.also_propagate);
@@ -1336,29 +1433,6 @@ int processCommand(redisClient *c) {
         return REDIS_OK;
     }
 
-    /* If cluster is enabled, redirect here */
-    if (server.cluster_enabled &&
-                !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) {
-        int hashslot;
-
-        if (server.cluster.state != REDIS_CLUSTER_OK) {
-            addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information");
-            return REDIS_OK;
-        } else {
-            int ask;
-            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&ask);
-            if (n == NULL) {
-                addReplyError(c,"Multi keys request invalid in cluster");
-                return REDIS_OK;
-            } else if (n != server.cluster.myself) {
-                addReplySds(c,sdscatprintf(sdsempty(),
-                    "-%s %d %s:%d\r\n", ask ? "ASK" : "MOVED",
-                    hashslot,n->ip,n->port));
-                return REDIS_OK;
-            }
-        }
-    }
-
     /* Handle the maxmemory directive.
      *
      * First we try to free some memory if possible (if there are volatile
@@ -1373,6 +1447,16 @@ int processCommand(redisClient *c) {
         }
     }
 
+    /* Don't accept write commands if there are problems persisting on disk. */
+    if (server.stop_writes_on_bgsave_err &&
+        server.saveparamslen > 0
+        && server.lastbgsave_status == REDIS_ERR &&
+        c->cmd->flags & REDIS_CMD_WRITE)
+    {
+        addReply(c, shared.bgsaveerr);
+        return REDIS_OK;
+    }
+
     /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
     if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
         &&
@@ -1505,6 +1589,17 @@ void echoCommand(redisClient *c) {
     addReplyBulk(c,c->argv[1]);
 }
 
+void timeCommand(redisClient *c) {
+    struct timeval tv;
+
+    /* gettimeofday() can only fail if &tv is a bad addresss so we
+     * don't check for errors. */
+    gettimeofday(&tv,NULL);
+    addReplyMultiBulkLen(c,2);
+    addReplyBulkLongLong(c,tv.tv_sec);
+    addReplyBulkLongLong(c,tv.tv_usec);
+}
+
 /* Convert an amount of bytes into a human readable string in the form
  * of 100B, 2G, 100M, 4K, and so forth. */
 void bytesToHuman(char *s, unsigned long long n) {
@@ -1559,6 +1654,7 @@ sds genRedisInfoString(char *section) {
             "multiplexing_api:%s\r\n"
             "gcc_version:%d.%d.%d\r\n"
             "process_id:%ld\r\n"
+            "run_id:%s\r\n"
             "tcp_port:%d\r\n"
             "uptime_in_seconds:%ld\r\n"
             "uptime_in_days:%ld\r\n"
@@ -1574,6 +1670,7 @@ sds genRedisInfoString(char *section) {
             0,0,0,
 #endif
             (long) getpid(),
+            server.runid,
             server.port,
             uptime,
             uptime/(3600*24),
@@ -1633,12 +1730,14 @@ sds genRedisInfoString(char *section) {
             "changes_since_last_save:%lld\r\n"
             "bgsave_in_progress:%d\r\n"
             "last_save_time:%ld\r\n"
+            "last_bgsave_status:%s\r\n"
             "bgrewriteaof_in_progress:%d\r\n",
             server.loading,
             server.aof_state != REDIS_AOF_OFF,
             server.dirty,
             server.rdb_child_pid != -1,
             server.lastsave,
+            server.lastbgsave_status == REDIS_OK ? "ok" : "err",
             server.aof_child_pid != -1);
 
         if (server.aof_state != REDIS_AOF_OFF) {
@@ -1694,6 +1793,7 @@ sds genRedisInfoString(char *section) {
             "# Stats\r\n"
             "total_connections_received:%lld\r\n"
             "total_commands_processed:%lld\r\n"
+            "instantaneous_ops_per_sec:%lld\r\n"
             "rejected_connections:%lld\r\n"
             "expired_keys:%lld\r\n"
             "evicted_keys:%lld\r\n"
@@ -1704,6 +1804,7 @@ sds genRedisInfoString(char *section) {
             "latest_fork_usec:%lld\r\n",
             server.stat_numconnections,
             server.stat_numcommands,
+            getOperationsPerSecond(),
             server.stat_rejected_conn,
             server.stat_expiredkeys,
             server.stat_evictedkeys,
@@ -1819,15 +1920,6 @@ sds genRedisInfoString(char *section) {
         }
     }
 
-    /* Clusetr */
-    if (allsections || defsections || !strcasecmp(section,"cluster")) {
-        if (sections++) info = sdscat(info,"\r\n");
-        info = sdscatprintf(info,
-        "# Cluster\r\n"
-        "cluster_enabled:%d\r\n",
-        server.cluster_enabled);
-    }
-
     /* Key space */
     if (allsections || defsections || !strcasecmp(section,"keyspace")) {
         if (sections++) info = sdscat(info,"\r\n");
@@ -2088,7 +2180,8 @@ void usage() {
     fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n");
     fprintf(stderr,"       ./redis-server - (read config from stdin)\n");
     fprintf(stderr,"       ./redis-server -v or --version\n");
-    fprintf(stderr,"       ./redis-server -h or --help\n\n");
+    fprintf(stderr,"       ./redis-server -h or --help\n");
+    fprintf(stderr,"       ./redis-server --test-memory <megabytes>\n\n");
     fprintf(stderr,"Examples:\n");
     fprintf(stderr,"       ./redis-server (run the server with default conf)\n");
     fprintf(stderr,"       ./redis-server /etc/redis/6379.conf\n");
@@ -2107,7 +2200,7 @@ void redisAsciiArt(void) {
         redisGitSHA1(),
         strtol(redisGitDirty(),NULL,10) > 0,
         (sizeof(long) == 8) ? "64" : "32",
-        server.cluster_enabled ? "cluster" : "stand alone",
+        "stand alone",
         server.port,
         (long) getpid()
     );
@@ -2144,6 +2237,8 @@ void setupSignalHandlers(void) {
     return;
 }
 
+void memtest(size_t megabytes, int passes);
+
 int main(int argc, char **argv) {
     long long start;
     struct timeval tv;
@@ -2165,6 +2260,17 @@ int main(int argc, char **argv) {
             strcmp(argv[1], "--version") == 0) version();
         if (strcmp(argv[1], "--help") == 0 ||
             strcmp(argv[1], "-h") == 0) usage();
+        if (strcmp(argv[1], "--test-memory") == 0) {
+            if (argc == 3) {
+                memtest(atoi(argv[2]),300);
+                exit(0);
+            } else {
+                fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
+                fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
+                exit(1);
+            }
+        }
+
         /* First argument is the config file name? */
         if (argv[j][0] != '-' || argv[j][1] != '-')
             configfile = argv[j++];