* s: command not allowed in scripts.
* R: random command. Command is not deterministic, that is, the same command
* with the same arguments, with the same key space, may have different
- * results. For instance SPOP and RANDOMKEY are two random commands. */
+ * results. For instance SPOP and RANDOMKEY are two random commands.
+ * S: Sort command output array if called from script, so that the output
+ * is deterministic.
+ */
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
{"set",setCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0},
{"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0},
{"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0},
{"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0},
- {"sinter",sinterCommand,-2,"r",0,NULL,1,-1,1,0,0},
+ {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
{"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
- {"sunion",sunionCommand,-2,"r",0,NULL,1,-1,1,0,0},
+ {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
{"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
- {"sdiff",sdiffCommand,-2,"r",0,NULL,1,-1,1,0,0},
+ {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
{"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
- {"smembers",sinterCommand,2,"r",0,NULL,1,1,1,0,0},
+ {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
{"zadd",zaddCommand,-4,"wm",0,NULL,1,1,1,0,0},
{"zincrby",zincrbyCommand,4,"wm",0,NULL,1,1,1,0,0},
{"zrem",zremCommand,-3,"w",0,NULL,1,1,1,0,0},
{"hincrbyfloat",hincrbyfloatCommand,4,"wm",0,NULL,1,1,1,0,0},
{"hdel",hdelCommand,-3,"w",0,NULL,1,1,1,0,0},
{"hlen",hlenCommand,2,"r",0,NULL,1,1,1,0,0},
- {"hkeys",hkeysCommand,2,"r",0,NULL,1,1,1,0,0},
- {"hvals",hvalsCommand,2,"r",0,NULL,1,1,1,0,0},
+ {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},
+ {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},
{"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0},
{"hexists",hexistsCommand,3,"r",0,NULL,1,1,1,0,0},
{"incrby",incrbyCommand,3,"wm",0,NULL,1,1,1,0,0},
{"expireat",expireatCommand,3,"w",0,NULL,1,1,1,0,0},
{"pexpire",pexpireCommand,3,"w",0,NULL,1,1,1,0,0},
{"pexpireat",pexpireatCommand,3,"w",0,NULL,1,1,1,0,0},
- {"keys",keysCommand,2,"r",0,NULL,0,0,0,0,0},
+ {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
{"dbsize",dbsizeCommand,1,"r",0,NULL,0,0,0,0,0},
{"auth",authCommand,2,"rs",0,NULL,0,0,0,0,0},
{"ping",pingCommand,1,"r",0,NULL,0,0,0,0,0},
{"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
{"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
{"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
- {"sort",sortCommand,-2,"wm",0,NULL,1,1,1,0,0},
+ {"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0},
{"info",infoCommand,-1,"r",0,NULL,0,0,0,0,0},
{"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},
{"ttl",ttlCommand,2,"r",0,NULL,1,1,1,0,0},
{"publish",publishCommand,3,"rpf",0,NULL,0,0,0,0,0},
{"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
{"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
- {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
{"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
{"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0},
- {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0},
{"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0},
{"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0},
{"client",clientCommand,-2,"ar",0,NULL,0,0,0,0,0},
{"eval",evalCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
{"evalsha",evalShaCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
{"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
- {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0}
+ {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
+ {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}
};
/*============================ Utility functions ============================ */
dictListDestructor /* val destructor */
};
-/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
- * clusterNode structures. */
-dictType clusterNodesDictType = {
- dictSdsHash, /* hash function */
- NULL, /* key dup */
- NULL, /* val dup */
- dictSdsKeyCompare, /* key compare */
- dictSdsDestructor, /* key destructor */
- NULL /* val destructor */
-};
-
int htNeedsResize(dict *dict) {
long long size, used;
REDIS_LRU_CLOCK_MAX;
}
+
+/* Add a sample to the operations per second array of samples. */
+void trackOperationsPerSecond(void) {
+ long long t = mstime() - server.ops_sec_last_sample_time;
+ long long ops = server.stat_numcommands - server.ops_sec_last_sample_ops;
+ long long ops_sec;
+
+ ops_sec = t > 0 ? (ops*1000/t) : 0;
+
+ server.ops_sec_samples[server.ops_sec_idx] = ops_sec;
+ server.ops_sec_idx = (server.ops_sec_idx+1) % REDIS_OPS_SEC_SAMPLES;
+ server.ops_sec_last_sample_time = mstime();
+ server.ops_sec_last_sample_ops = server.stat_numcommands;
+}
+
+/* Return the mean of all the samples. */
+long long getOperationsPerSecond(void) {
+ int j;
+ long long sum = 0;
+
+ for (j = 0; j < REDIS_OPS_SEC_SAMPLES; j++)
+ sum += server.ops_sec_samples[j];
+ return sum / REDIS_OPS_SEC_SAMPLES;
+}
+
+/* Check for timeouts. Returns non-zero if the client was terminated */
+int clientsCronHandleTimeout(redisClient *c) {
+ time_t now = server.unixtime;
+
+ if (server.maxidletime &&
+ !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
+ !(c->flags & REDIS_MASTER) && /* no timeout for masters */
+ !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
+ dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
+ listLength(c->pubsub_patterns) == 0 &&
+ (now - c->lastinteraction > server.maxidletime))
+ {
+ redisLog(REDIS_VERBOSE,"Closing idle client");
+ freeClient(c);
+ return 1;
+ } else if (c->flags & REDIS_BLOCKED) {
+ if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
+ addReply(c,shared.nullmultibulk);
+ unblockClientWaitingData(c);
+ }
+ }
+ return 0;
+}
+
+/* The client query buffer is an sds.c string that can end with a lot of
+ * free space not used, this function reclaims space if needed.
+ *
+ * The funciton always returns 0 as it never terminates the client. */
+int clientsCronResizeQueryBuffer(redisClient *c) {
+ size_t querybuf_size = sdsAllocSize(c->querybuf);
+ time_t idletime = server.unixtime - c->lastinteraction;
+
+ /* There are two conditions to resize the query buffer:
+ * 1) Query buffer is > BIG_ARG and too big for latest peak.
+ * 2) Client is inactive and the buffer is bigger than 1k. */
+ if (((querybuf_size > REDIS_MBULK_BIG_ARG) &&
+ (querybuf_size/(c->querybuf_peak+1)) > 2) ||
+ (querybuf_size > 1024 && idletime > 2))
+ {
+ /* Only resize the query buffer if it is actually wasting space. */
+ if (sdsavail(c->querybuf) > 1024) {
+ c->querybuf = sdsRemoveFreeSpace(c->querybuf);
+ }
+ }
+ /* Reset the peak again to capture the peak memory usage in the next
+ * cycle. */
+ c->querybuf_peak = 0;
+ return 0;
+}
+
+void clientsCron(void) {
+ /* Make sure to process at least 1/100 of clients per call.
+ * Since this function is called 10 times per second we are sure that
+ * in the worst case we process all the clients in 10 seconds.
+ * In normal conditions (a reasonable number of clients) we process
+ * all the clients in a shorter time. */
+ int numclients = listLength(server.clients);
+ int iterations = numclients/100;
+
+ if (iterations < 50)
+ iterations = (numclients < 50) ? numclients : 50;
+ while(listLength(server.clients) && iterations--) {
+ redisClient *c;
+ listNode *head;
+
+ /* Rotate the list, take the current head, process.
+ * This way if the client must be removed from the list it's the
+ * first element and we don't incur into O(N) computation. */
+ listRotate(server.clients);
+ head = listFirst(server.clients);
+ c = listNodeValue(head);
+ /* The following functions do different service checks on the client.
+ * The protocol is that they return non-zero if the client was
+ * terminated. */
+ if (clientsCronHandleTimeout(c)) continue;
+ if (clientsCronResizeQueryBuffer(c)) continue;
+ }
+}
+
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j, loops = server.cronloops;
REDIS_NOTUSED(eventLoop);
* To access a global var is faster than calling time(NULL) */
server.unixtime = time(NULL);
+ trackOperationsPerSecond();
+
/* We have just 22 bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
* 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
zmalloc_used_memory());
}
- /* Close connections of timedout clients */
- if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
- closeTimedoutClients();
+ /* We need to do a few operations on clients asynchronously. */
+ clientsCron();
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
* in order to guarantee a strict consistency. */
if (server.masterhost == NULL) activeExpireCycle();
+ /* Close clients that need to be closed asynchronous */
+ freeClientsInAsyncFreeQueue();
+
/* Replication cron function -- used to reconnect to master and
* to detect transfer failures. */
if (!(loops % 10)) replicationCron();
- /* Run other sub-systems specific cron jobs */
- if (server.cluster_enabled && !(loops % 10)) clusterCron();
-
server.cronloops++;
return 100;
}
"-LOADING Redis is loading the dataset in memory\r\n"));
shared.slowscripterr = createObject(REDIS_STRING,sdsnew(
"-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
+ shared.bgsaveerr = createObject(REDIS_STRING,sdsnew(
+ "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Write commands are disabled. Please check Redis logs for details about the error.\r\n"));
shared.space = createObject(REDIS_STRING,sdsnew(" "));
shared.colon = createObject(REDIS_STRING,sdsnew(":"));
shared.plus = createObject(REDIS_STRING,sdsnew("+"));
shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
- shared.mbulk3 = createStringObject("*3\r\n",4);
- shared.mbulk4 = createStringObject("*4\r\n",4);
+ shared.del = createStringObject("DEL",3);
+ shared.rpop = createStringObject("RPOP",4);
+ shared.lpop = createStringObject("LPOP",4);
for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
shared.integers[j]->encoding = REDIS_ENCODING_INT;
}
+ for (j = 0; j < REDIS_SHARED_BULKHDR_LEN; j++) {
+ shared.mbulkhdr[j] = createObject(REDIS_STRING,
+ sdscatprintf(sdsempty(),"*%d\r\n",j));
+ shared.bulkhdr[j] = createObject(REDIS_STRING,
+ sdscatprintf(sdsempty(),"$%d\r\n",j));
+ }
}
void initServerConfig() {
+ getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
+ server.runid[REDIS_RUN_ID_SIZE] = '\0';
+ server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
server.port = REDIS_SERVERPORT;
server.bindaddr = NULL;
server.unixsocket = NULL;
server.maxmemory = 0;
server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
server.maxmemory_samples = 3;
- server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
- server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
+ server.hash_max_ziplist_entries = REDIS_HASH_MAX_ZIPLIST_ENTRIES;
+ server.hash_max_ziplist_value = REDIS_HASH_MAX_ZIPLIST_VALUE;
server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
server.shutdown_asap = 0;
server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
server.repl_timeout = REDIS_REPL_TIMEOUT;
- server.cluster_enabled = 0;
- server.cluster.configfile = zstrdup("nodes.conf");
server.lua_caller = NULL;
server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
server.lua_client = NULL;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_bytes = 0;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_seconds = 0;
- server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].hard_limit_bytes = 0;
- server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_bytes = 1024*1024*256;
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].hard_limit_bytes = 1024*1024*256;
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_bytes = 1024*1024*64;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_seconds = 60;
- server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].hard_limit_bytes = 1024*1024*256;
- server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_bytes = 1024*1024*32;
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].hard_limit_bytes = 1024*1024*32;
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_bytes = 1024*1024*8;
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_seconds = 60;
/* Double constants initialization */
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
+ server.lpushCommand = lookupCommandByCString("lpush");
/* Slow log */
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
server.stat_peak_memory = 0;
server.stat_fork_time = 0;
server.stat_rejected_conn = 0;
+ memset(server.ops_sec_samples,0,sizeof(server.ops_sec_samples));
+ server.ops_sec_idx = 0;
+ server.ops_sec_last_sample_time = mstime();
+ server.ops_sec_last_sample_ops = 0;
server.unixtime = time(NULL);
+ server.lastbgsave_status = REDIS_OK;
+ server.stop_writes_on_bgsave_err = 1;
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
}
}
- if (server.cluster_enabled) clusterInit();
+ /* 32 bit instances are limited to 4GB of address space, so if there is
+ * no explicit limit in the user provided configuration we set a limit
+ * at 3.5GB using maxmemory with 'noeviction' policy'. This saves
+ * useless crashes of the Redis instance. */
+ if (server.arch_bits == 32 && server.maxmemory == 0) {
+ redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with 'noeviction' policy now.");
+ server.maxmemory = 3584LL*(1024*1024); /* 3584 MB = 3.5 GB */
+ server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
+ }
+
scriptingInit();
slowlogInit();
bioInit();
case 'f': c->flags |= REDIS_CMD_FORCE_REPLICATION; break;
case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
case 'R': c->flags |= REDIS_CMD_RANDOM; break;
+ case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
default: redisPanic("Unsupported command flag"); break;
}
f++;
}
}
+/* ========================== Redis OP Array API ============================ */
+
+void redisOpArrayInit(redisOpArray *oa) {
+ oa->ops = NULL;
+ oa->numops = 0;
+}
+
+int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
+ robj **argv, int argc, int target)
+{
+ redisOp *op;
+
+ oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
+ op = oa->ops+oa->numops;
+ op->cmd = cmd;
+ op->dbid = dbid;
+ op->argv = argv;
+ op->argc = argc;
+ op->target = target;
+ oa->numops++;
+ return oa->numops;
+}
+
+void redisOpArrayFree(redisOpArray *oa) {
+ while(oa->numops) {
+ int j;
+ redisOp *op;
+
+ oa->numops--;
+ op = oa->ops+oa->numops;
+ for (j = 0; j < op->argc; j++)
+ decrRefCount(op->argv[j]);
+ zfree(op->argv);
+ }
+ zfree(oa->ops);
+}
+
/* ====================== Commands lookup and execution ===================== */
struct redisCommand *lookupCommand(sds name) {
return cmd;
}
+/* Propagate the specified command (in the context of the specified database id)
+ * to AOF, Slaves and Monitors.
+ *
+ * flags are an xor between:
+ * + REDIS_PROPAGATE_NONE (no propagation of command at all)
+ * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
+ * + REDIS_PROPAGATE_REPL (propagate into the replication link)
+ */
+void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+ int flags)
+{
+ if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
+ feedAppendOnlyFile(cmd,dbid,argv,argc);
+ if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
+ replicationFeedSlaves(server.slaves,dbid,argv,argc);
+}
+
+/* Used inside commands to schedule the propagation of additional commands
+ * after the current command is propagated to AOF / Replication. */
+void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+ int target)
+{
+ redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target);
+}
+
/* Call() is the core of Redis execution of a command */
-void call(redisClient *c) {
+void call(redisClient *c, int flags) {
long long dirty, start = ustime(), duration;
+ /* Sent the command to clients in MONITOR mode, only if the commands are
+ * not geneated from reading an AOF. */
+ if (listLength(server.monitors) && !server.loading)
+ replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
+
+ /* Call the command. */
+ redisOpArrayInit(&server.also_propagate);
dirty = server.dirty;
c->cmd->proc(c);
dirty = server.dirty-dirty;
duration = ustime()-start;
- c->cmd->microseconds += duration;
- slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
- c->cmd->calls++;
-
- if (server.aof_state != REDIS_AOF_OFF && dirty > 0)
- feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
- if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
- listLength(server.slaves))
- replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
- if (listLength(server.monitors))
- replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
+
+ /* When EVAL is called loading the AOF we don't want commands called
+ * from Lua to go into the slowlog or to populate statistics. */
+ if (server.loading && c->flags & REDIS_LUA_CLIENT)
+ flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);
+
+ /* Log the command into the Slow log if needed, and populate the
+ * per-command statistics that we show in INFO commandstats. */
+ if (flags & REDIS_CALL_SLOWLOG)
+ slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
+ if (flags & REDIS_CALL_STATS) {
+ c->cmd->microseconds += duration;
+ c->cmd->calls++;
+ }
+
+ /* Propagate the command into the AOF and replication link */
+ if (flags & REDIS_CALL_PROPAGATE) {
+ int flags = REDIS_PROPAGATE_NONE;
+
+ if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)
+ flags |= REDIS_PROPAGATE_REPL;
+ if (dirty)
+ flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
+ if (flags != REDIS_PROPAGATE_NONE)
+ propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
+ }
+ /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional
+ * PUSH command. */
+ if (server.also_propagate.numops) {
+ int j;
+ redisOp *rop;
+
+ for (j = 0; j < server.also_propagate.numops; j++) {
+ rop = &server.also_propagate.ops[j];
+ propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);
+ }
+ redisOpArrayFree(&server.also_propagate);
+ }
server.stat_numcommands++;
}
return REDIS_OK;
}
- /* If cluster is enabled, redirect here */
- if (server.cluster_enabled &&
- !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) {
- int hashslot;
-
- if (server.cluster.state != REDIS_CLUSTER_OK) {
- addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information");
- return REDIS_OK;
- } else {
- int ask;
- clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&ask);
- if (n == NULL) {
- addReplyError(c,"Multi keys request invalid in cluster");
- return REDIS_OK;
- } else if (n != server.cluster.myself) {
- addReplySds(c,sdscatprintf(sdsempty(),
- "-%s %d %s:%d\r\n", ask ? "ASK" : "MOVED",
- hashslot,n->ip,n->port));
- return REDIS_OK;
- }
- }
- }
-
/* Handle the maxmemory directive.
*
* First we try to free some memory if possible (if there are volatile
* keys in the dataset). If there are not the only thing we can do
* is returning an error. */
- if (server.maxmemory) freeMemoryIfNeeded();
- if (server.maxmemory && (c->cmd->flags & REDIS_CMD_DENYOOM) &&
- zmalloc_used_memory() > server.maxmemory)
+ if (server.maxmemory) {
+ int retval = freeMemoryIfNeeded();
+ if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
+ addReplyError(c,
+ "command not allowed when used memory > 'maxmemory'");
+ return REDIS_OK;
+ }
+ }
+
+ /* Don't accept write commands if there are problems persisting on disk. */
+ if (server.stop_writes_on_bgsave_err &&
+ server.saveparamslen > 0
+ && server.lastbgsave_status == REDIS_ERR &&
+ c->cmd->flags & REDIS_CMD_WRITE)
{
- addReplyError(c,"command not allowed when used memory > 'maxmemory'");
+ addReply(c, shared.bgsaveerr);
return REDIS_OK;
}
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
- call(c);
+ call(c,REDIS_CALL_FULL);
}
return REDIS_OK;
}
addReplyBulk(c,c->argv[1]);
}
+void timeCommand(redisClient *c) {
+ struct timeval tv;
+
+ /* gettimeofday() can only fail if &tv is a bad addresss so we
+ * don't check for errors. */
+ gettimeofday(&tv,NULL);
+ addReplyMultiBulkLen(c,2);
+ addReplyBulkLongLong(c,tv.tv_sec);
+ addReplyBulkLongLong(c,tv.tv_usec);
+}
+
/* Convert an amount of bytes into a human readable string in the form
* of 100B, 2G, 100M, 4K, and so forth. */
void bytesToHuman(char *s, unsigned long long n) {
"redis_version:%s\r\n"
"redis_git_sha1:%s\r\n"
"redis_git_dirty:%d\r\n"
- "arch_bits:%s\r\n"
+ "arch_bits:%d\r\n"
"multiplexing_api:%s\r\n"
"gcc_version:%d.%d.%d\r\n"
"process_id:%ld\r\n"
+ "run_id:%s\r\n"
"tcp_port:%d\r\n"
"uptime_in_seconds:%ld\r\n"
"uptime_in_days:%ld\r\n"
REDIS_VERSION,
redisGitSHA1(),
strtol(redisGitDirty(),NULL,10) > 0,
- (sizeof(long) == 8) ? "64" : "32",
+ server.arch_bits,
aeGetApiName(),
#ifdef __GNUC__
__GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__,
0,0,0,
#endif
(long) getpid(),
+ server.runid,
server.port,
uptime,
uptime/(3600*24),
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info,
"# Clients\r\n"
- "connected_clients:%d\r\n"
+ "connected_clients:%lu\r\n"
"client_longest_output_list:%lu\r\n"
"client_biggest_input_buf:%lu\r\n"
"blocked_clients:%d\r\n",
"changes_since_last_save:%lld\r\n"
"bgsave_in_progress:%d\r\n"
"last_save_time:%ld\r\n"
+ "last_bgsave_status:%s\r\n"
"bgrewriteaof_in_progress:%d\r\n",
server.loading,
server.aof_state != REDIS_AOF_OFF,
server.dirty,
server.rdb_child_pid != -1,
server.lastsave,
+ server.lastbgsave_status == REDIS_OK ? "ok" : "err",
server.aof_child_pid != -1);
if (server.aof_state != REDIS_AOF_OFF) {
"# Stats\r\n"
"total_connections_received:%lld\r\n"
"total_commands_processed:%lld\r\n"
+ "instantaneous_ops_per_sec:%lld\r\n"
"rejected_connections:%lld\r\n"
"expired_keys:%lld\r\n"
"evicted_keys:%lld\r\n"
"keyspace_hits:%lld\r\n"
"keyspace_misses:%lld\r\n"
"pubsub_channels:%ld\r\n"
- "pubsub_patterns:%u\r\n"
+ "pubsub_patterns:%lu\r\n"
"latest_fork_usec:%lld\r\n",
server.stat_numconnections,
server.stat_numcommands,
+ getOperationsPerSecond(),
server.stat_rejected_conn,
server.stat_expiredkeys,
server.stat_evictedkeys,
}
}
info = sdscatprintf(info,
- "connected_slaves:%d\r\n",
+ "connected_slaves:%lu\r\n",
listLength(server.slaves));
if (listLength(server.slaves)) {
int slaveid = 0;
}
}
- /* Clusetr */
- if (allsections || defsections || !strcasecmp(section,"cluster")) {
- if (sections++) info = sdscat(info,"\r\n");
- info = sdscatprintf(info,
- "# Cluster\r\n"
- "cluster_enabled:%d\r\n",
- server.cluster_enabled);
- }
-
/* Key space */
if (allsections || defsections || !strcasecmp(section,"keyspace")) {
if (sections++) info = sdscat(info,"\r\n");
/* ============================ Maxmemory directive ======================== */
/* This function gets called when 'maxmemory' is set on the config file to limit
- * the max memory used by the server, and we are out of memory.
- * This function will try to, in order:
+ * the max memory used by the server, before processing a command.
*
- * - Free objects from the free list
- * - Try to remove keys with an EXPIRE set
+ * The goal of the function is to free enough memory to keep Redis under the
+ * configured memory limit.
*
- * It is not possible to free enough memory to reach used-memory < maxmemory
- * the server will start refusing commands that will enlarge even more the
- * memory usage.
+ * The function starts calculating how many bytes should be freed to keep
+ * Redis under the limit, and enters a loop selecting the best keys to
+ * evict accordingly to the configured policy.
+ *
+ * If all the bytes needed to return back under the limit were freed the
+ * function returns REDIS_OK, otherwise REDIS_ERR is returned, and the caller
+ * should block the execution of commands that will result in more memory
+ * used by the server.
*/
-void freeMemoryIfNeeded(void) {
- /* Remove keys accordingly to the active policy as long as we are
- * over the memory limit. */
- if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION) return;
+int freeMemoryIfNeeded(void) {
+ size_t mem_used, mem_tofree, mem_freed;
+ int slaves = listLength(server.slaves);
+
+ /* Remove the size of slaves output buffers and AOF buffer from the
+ * count of used memory. */
+ mem_used = zmalloc_used_memory();
+ if (slaves) {
+ listIter li;
+ listNode *ln;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ redisClient *slave = listNodeValue(ln);
+ unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
+ if (obuf_bytes > mem_used)
+ mem_used = 0;
+ else
+ mem_used -= obuf_bytes;
+ }
+ }
+ if (server.aof_state != REDIS_AOF_OFF) {
+ mem_used -= sdslen(server.aof_buf);
+ mem_used -= sdslen(server.aof_rewrite_buf);
+ }
+
+ /* Check if we are over the memory limit. */
+ if (mem_used <= server.maxmemory) return REDIS_OK;
- while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
- int j, k, freed = 0;
+ if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
+ return REDIS_ERR; /* We need to free memory, but policy forbids. */
+
+ /* Compute how much memory we need to free. */
+ mem_tofree = mem_used - server.maxmemory;
+ mem_freed = 0;
+ while (mem_freed < mem_tofree) {
+ int j, k, keys_freed = 0;
for (j = 0; j < server.dbnum; j++) {
long bestval = 0; /* just to prevent warning */
/* Finally remove the selected key. */
if (bestkey) {
+ long long delta;
+
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
propagateExpire(db,keyobj);
+ /* We compute the amount of memory freed by dbDelete() alone.
+ * It is possible that actually the memory needed to propagate
+ * the DEL in AOF and replication link is greater than the one
+ * we are freeing removing the key, but we can't account for
+ * that otherwise we would never exit the loop.
+ *
+ * AOF and Output buffer memory will be freed eventually so
+ * we only care about memory used by the key space. */
+ delta = (long long) zmalloc_used_memory();
dbDelete(db,keyobj);
+ delta -= (long long) zmalloc_used_memory();
+ mem_freed += delta;
server.stat_evictedkeys++;
decrRefCount(keyobj);
- freed++;
+ keys_freed++;
+
+ /* When the memory to free starts to be big enough, we may
+ * start spending so much time here that is impossible to
+ * deliver data to the slaves fast enough, so we force the
+ * transmission here inside the loop. */
+ if (slaves) flushSlavesOutputBuffers();
}
}
- if (!freed) return; /* nothing to free... */
+ if (!keys_freed) return REDIS_ERR; /* nothing to free... */
}
+ return REDIS_OK;
}
/* =================================== Main! ================================ */
fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf] [options]\n");
fprintf(stderr," ./redis-server - (read config from stdin)\n");
fprintf(stderr," ./redis-server -v or --version\n");
- fprintf(stderr," ./redis-server -h or --help\n\n");
+ fprintf(stderr," ./redis-server -h or --help\n");
+ fprintf(stderr," ./redis-server --test-memory <megabytes>\n\n");
fprintf(stderr,"Examples:\n");
fprintf(stderr," ./redis-server (run the server with default conf)\n");
fprintf(stderr," ./redis-server /etc/redis/6379.conf\n");
redisGitSHA1(),
strtol(redisGitDirty(),NULL,10) > 0,
(sizeof(long) == 8) ? "64" : "32",
- server.cluster_enabled ? "cluster" : "stand alone",
+ "stand alone",
server.port,
(long) getpid()
);
return;
}
+void memtest(size_t megabytes, int passes);
+
int main(int argc, char **argv) {
long long start;
struct timeval tv;
strcmp(argv[1], "--version") == 0) version();
if (strcmp(argv[1], "--help") == 0 ||
strcmp(argv[1], "-h") == 0) usage();
+ if (strcmp(argv[1], "--test-memory") == 0) {
+ if (argc == 3) {
+ memtest(atoi(argv[2]),300);
+ exit(0);
+ } else {
+ fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
+ fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
+ exit(1);
+ }
+ }
+
/* First argument is the config file name? */
if (argv[j][0] != '-' || argv[j][1] != '-')
configfile = argv[j++];