X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/a48c8d873bc2a49f66ab398ebed51dc45764ca17..48a32944e6b5e30754701109ff23d85a418d3d4d:/src/redis.c?ds=sidebyside diff --git a/src/redis.c b/src/redis.c index 7446e72e..b4acd6ea 100644 --- a/src/redis.c +++ b/src/redis.c @@ -102,7 +102,10 @@ struct redisCommand *commandTable; * s: command not allowed in scripts. * R: random command. Command is not deterministic, that is, the same command * with the same arguments, with the same key space, may have different - * results. For instance SPOP and RANDOMKEY are two random commands. */ + * results. For instance SPOP and RANDOMKEY are two random commands. + * S: Sort command output array if called from script, so that the output + * is deterministic. + */ struct redisCommand redisCommandTable[] = { {"get",getCommand,2,"r",0,NULL,1,1,1,0,0}, {"set",setCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0}, @@ -145,13 +148,13 @@ struct redisCommand redisCommandTable[] = { {"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0}, {"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0}, {"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0}, - {"sinter",sinterCommand,-2,"r",0,NULL,1,-1,1,0,0}, + {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0}, {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, - {"sunion",sunionCommand,-2,"r",0,NULL,1,-1,1,0,0}, + {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0}, {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, - {"sdiff",sdiffCommand,-2,"r",0,NULL,1,-1,1,0,0}, + {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0}, {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, - {"smembers",sinterCommand,2,"r",0,NULL,1,1,1,0,0}, + {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0}, {"zadd",zaddCommand,-4,"wm",0,NULL,1,1,1,0,0}, {"zincrby",zincrbyCommand,4,"wm",0,NULL,1,1,1,0,0}, {"zrem",zremCommand,-3,"w",0,NULL,1,1,1,0,0}, @@ -177,8 +180,8 @@ struct redisCommand redisCommandTable[] = { {"hincrbyfloat",hincrbyfloatCommand,4,"wm",0,NULL,1,1,1,0,0}, {"hdel",hdelCommand,-3,"w",0,NULL,1,1,1,0,0}, {"hlen",hlenCommand,2,"r",0,NULL,1,1,1,0,0}, - {"hkeys",hkeysCommand,2,"r",0,NULL,1,1,1,0,0}, - {"hvals",hvalsCommand,2,"r",0,NULL,1,1,1,0,0}, + {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0}, + {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0}, {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0}, {"hexists",hexistsCommand,3,"r",0,NULL,1,1,1,0,0}, {"incrby",incrbyCommand,3,"wm",0,NULL,1,1,1,0,0}, @@ -196,7 +199,7 @@ struct redisCommand redisCommandTable[] = { {"expireat",expireatCommand,3,"w",0,NULL,1,1,1,0,0}, {"pexpire",pexpireCommand,3,"w",0,NULL,1,1,1,0,0}, {"pexpireat",pexpireatCommand,3,"w",0,NULL,1,1,1,0,0}, - {"keys",keysCommand,2,"r",0,NULL,0,0,0,0,0}, + {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0}, {"dbsize",dbsizeCommand,1,"r",0,NULL,0,0,0,0,0}, {"auth",authCommand,2,"rs",0,NULL,0,0,0,0,0}, {"ping",pingCommand,1,"r",0,NULL,0,0,0,0,0}, @@ -213,7 +216,7 @@ struct redisCommand redisCommandTable[] = { {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0}, {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0}, {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0}, - {"sort",sortCommand,-2,"wm",0,NULL,1,1,1,0,0}, + {"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0}, {"info",infoCommand,-1,"r",0,NULL,0,0,0,0,0}, {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0}, {"ttl",ttlCommand,2,"r",0,NULL,1,1,1,0,0}, @@ -754,6 +757,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * in order to guarantee a strict consistency. */ if (server.masterhost == NULL) activeExpireCycle(); + /* Close clients that need to be closed asynchronous */ + freeClientsInAsyncFreeQueue(); + /* Replication cron function -- used to reconnect to master and * to detect transfer failures. */ if (!(loops % 10)) replicationCron(); @@ -845,15 +851,21 @@ void createSharedObjects(void) { shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); - shared.mbulk3 = createStringObject("*3\r\n",4); - shared.mbulk4 = createStringObject("*4\r\n",4); + shared.del = createStringObject("DEL",3); for (j = 0; j < REDIS_SHARED_INTEGERS; j++) { shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j); shared.integers[j]->encoding = REDIS_ENCODING_INT; } + for (j = 0; j < REDIS_SHARED_BULKHDR_LEN; j++) { + shared.mbulkhdr[j] = createObject(REDIS_STRING, + sdscatprintf(sdsempty(),"*%d\r\n",j)); + shared.bulkhdr[j] = createObject(REDIS_STRING, + sdscatprintf(sdsempty(),"$%d\r\n",j)); + } } void initServerConfig() { + server.arch_bits = (sizeof(long) == 8) ? 64 : 32; server.port = REDIS_SERVERPORT; server.bindaddr = NULL; server.unixsocket = NULL; @@ -926,6 +938,17 @@ void initServerConfig() { server.repl_serve_stale_data = 1; server.repl_down_since = -1; + /* Client output buffer limits */ + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_bytes = 0; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_seconds = 0; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].hard_limit_bytes = 1024*1024*256; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_bytes = 1024*1024*64; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_seconds = 60; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].hard_limit_bytes = 1024*1024*32; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_bytes = 1024*1024*8; + server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_seconds = 60; + /* Double constants initialization */ R_Zero = 0.0; R_PosInf = 1.0/R_Zero; @@ -1002,6 +1025,7 @@ void initServer() { server.current_client = NULL; server.clients = listCreate(); + server.clients_to_close = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); server.unblocked_clients = listCreate(); @@ -1076,6 +1100,16 @@ void initServer() { } } + /* 32 bit instances are limited to 4GB of address space, so if there is + * no explicit limit in the user provided configuration we set a limit + * at 3.5GB using maxmemory with 'noeviction' policy'. This saves + * useless crashes of the Redis instance. */ + if (server.arch_bits == 32 && server.maxmemory == 0) { + redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with 'noeviction' policy now."); + server.maxmemory = 3584LL*(1024*1024); /* 3584 MB = 3.5 GB */ + server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION; + } + if (server.cluster_enabled) clusterInit(); scriptingInit(); slowlogInit(); @@ -1103,6 +1137,7 @@ void populateCommandTable(void) { case 'f': c->flags |= REDIS_CMD_FORCE_REPLICATION; break; case 's': c->flags |= REDIS_CMD_NOSCRIPT; break; case 'R': c->flags |= REDIS_CMD_RANDOM; break; + case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break; default: redisPanic("Unsupported command flag"); break; } f++; @@ -1141,24 +1176,34 @@ struct redisCommand *lookupCommandByCString(char *s) { } /* Call() is the core of Redis execution of a command */ -void call(redisClient *c) { +void call(redisClient *c, int flags) { long long dirty, start = ustime(), duration; dirty = server.dirty; c->cmd->proc(c); dirty = server.dirty-dirty; duration = ustime()-start; - c->cmd->microseconds += duration; - slowlogPushEntryIfNeeded(c->argv,c->argc,duration); - c->cmd->calls++; - - if (server.aof_state != REDIS_AOF_OFF && dirty > 0) - feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc); - if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) && - listLength(server.slaves)) - replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc); - if (listLength(server.monitors)) - replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc); + + /* When EVAL is called loading the AOF we don't want commands called + * from Lua to go into the slowlog or to populate statistics. */ + if (server.loading && c->flags & REDIS_LUA_CLIENT) + flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS); + + if (flags & REDIS_CALL_SLOWLOG) + slowlogPushEntryIfNeeded(c->argv,c->argc,duration); + if (flags & REDIS_CALL_STATS) { + c->cmd->microseconds += duration; + c->cmd->calls++; + } + if (flags & REDIS_CALL_PROPAGATE) { + if (server.aof_state != REDIS_AOF_OFF && dirty > 0) + feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc); + if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) && + listLength(server.slaves)) + replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc); + if (listLength(server.monitors)) + replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc); + } server.stat_numcommands++; } @@ -1230,12 +1275,13 @@ int processCommand(redisClient *c) { * First we try to free some memory if possible (if there are volatile * keys in the dataset). If there are not the only thing we can do * is returning an error. */ - if (server.maxmemory) freeMemoryIfNeeded(); - if (server.maxmemory && (c->cmd->flags & REDIS_CMD_DENYOOM) && - zmalloc_used_memory() > server.maxmemory) - { - addReplyError(c,"command not allowed when used memory > 'maxmemory'"); - return REDIS_OK; + if (server.maxmemory) { + int retval = freeMemoryIfNeeded(); + if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) { + addReplyError(c, + "command not allowed when used memory > 'maxmemory'"); + return REDIS_OK; + } } /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ @@ -1287,7 +1333,7 @@ int processCommand(redisClient *c) { queueMultiCommand(c); addReply(c,shared.queued); } else { - call(c); + call(c,REDIS_CALL_FULL); } return REDIS_OK; } @@ -1420,7 +1466,7 @@ sds genRedisInfoString(char *section) { "redis_version:%s\r\n" "redis_git_sha1:%s\r\n" "redis_git_dirty:%d\r\n" - "arch_bits:%s\r\n" + "arch_bits:%d\r\n" "multiplexing_api:%s\r\n" "gcc_version:%d.%d.%d\r\n" "process_id:%ld\r\n" @@ -1431,7 +1477,7 @@ sds genRedisInfoString(char *section) { REDIS_VERSION, redisGitSHA1(), strtol(redisGitDirty(),NULL,10) > 0, - (sizeof(long) == 8) ? "64" : "32", + server.arch_bits, aeGetApiName(), #ifdef __GNUC__ __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__, @@ -1450,7 +1496,7 @@ sds genRedisInfoString(char *section) { if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# Clients\r\n" - "connected_clients:%d\r\n" + "connected_clients:%lu\r\n" "client_longest_output_list:%lu\r\n" "client_biggest_input_buf:%lu\r\n" "blocked_clients:%d\r\n", @@ -1565,7 +1611,7 @@ sds genRedisInfoString(char *section) { "keyspace_hits:%lld\r\n" "keyspace_misses:%lld\r\n" "pubsub_channels:%ld\r\n" - "pubsub_patterns:%u\r\n" + "pubsub_patterns:%lu\r\n" "latest_fork_usec:%lld\r\n", server.stat_numconnections, server.stat_numcommands, @@ -1618,7 +1664,7 @@ sds genRedisInfoString(char *section) { } } info = sdscatprintf(info, - "connected_slaves:%d\r\n", + "connected_slaves:%lu\r\n", listLength(server.slaves)); if (listLength(server.slaves)) { int slaveid = 0; @@ -1738,23 +1784,57 @@ void monitorCommand(redisClient *c) { /* ============================ Maxmemory directive ======================== */ /* This function gets called when 'maxmemory' is set on the config file to limit - * the max memory used by the server, and we are out of memory. - * This function will try to, in order: + * the max memory used by the server, before processing a command. + * + * The goal of the function is to free enough memory to keep Redis under the + * configured memory limit. * - * - Free objects from the free list - * - Try to remove keys with an EXPIRE set + * The function starts calculating how many bytes should be freed to keep + * Redis under the limit, and enters a loop selecting the best keys to + * evict accordingly to the configured policy. * - * It is not possible to free enough memory to reach used-memory < maxmemory - * the server will start refusing commands that will enlarge even more the - * memory usage. + * If all the bytes needed to return back under the limit were freed the + * function returns REDIS_OK, otherwise REDIS_ERR is returned, and the caller + * should block the execution of commands that will result in more memory + * used by the server. */ -void freeMemoryIfNeeded(void) { - /* Remove keys accordingly to the active policy as long as we are - * over the memory limit. */ - if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION) return; +int freeMemoryIfNeeded(void) { + size_t mem_used, mem_tofree, mem_freed; + int slaves = listLength(server.slaves); + + /* Remove the size of slaves output buffers and AOF buffer from the + * count of used memory. */ + mem_used = zmalloc_used_memory(); + if (slaves) { + listIter li; + listNode *ln; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + redisClient *slave = listNodeValue(ln); + unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave); + if (obuf_bytes > mem_used) + mem_used = 0; + else + mem_used -= obuf_bytes; + } + } + if (server.aof_state != REDIS_AOF_OFF) { + mem_used -= sdslen(server.aof_buf); + mem_used -= sdslen(server.aof_rewrite_buf); + } + + /* Check if we are over the memory limit. */ + if (mem_used <= server.maxmemory) return REDIS_OK; + + if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION) + return REDIS_ERR; /* We need to free memory, but policy forbids. */ - while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) { - int j, k, freed = 0; + /* Compute how much memory we need to free. */ + mem_tofree = mem_used - server.maxmemory; + mem_freed = 0; + while (mem_freed < mem_tofree) { + int j, k, keys_freed = 0; for (j = 0; j < server.dbnum; j++) { long bestval = 0; /* just to prevent warning */ @@ -1827,16 +1907,36 @@ void freeMemoryIfNeeded(void) { /* Finally remove the selected key. */ if (bestkey) { + long long delta; + robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); propagateExpire(db,keyobj); + /* We compute the amount of memory freed by dbDelete() alone. + * It is possible that actually the memory needed to propagate + * the DEL in AOF and replication link is greater than the one + * we are freeing removing the key, but we can't account for + * that otherwise we would never exit the loop. + * + * AOF and Output buffer memory will be freed eventually so + * we only care about memory used by the key space. */ + delta = (long long) zmalloc_used_memory(); dbDelete(db,keyobj); + delta -= (long long) zmalloc_used_memory(); + mem_freed += delta; server.stat_evictedkeys++; decrRefCount(keyobj); - freed++; + keys_freed++; + + /* When the memory to free starts to be big enough, we may + * start spending so much time here that is impossible to + * deliver data to the slaves fast enough, so we force the + * transmission here inside the loop. */ + if (slaves) flushSlavesOutputBuffers(); } } - if (!freed) return; /* nothing to free... */ + if (!keys_freed) return REDIS_ERR; /* nothing to free... */ } + return REDIS_OK; } /* =================================== Main! ================================ */ @@ -1959,7 +2059,7 @@ int main(int argc, char **argv) { long long start; struct timeval tv; - /* We need to initialize our libraries, and the server. */ + /* We need to initialize our libraries, and the server configuration. */ zmalloc_enable_thread_safeness(); srand(time(NULL)^getpid()); gettimeofday(&tv,NULL);