X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/8562798308391d489016b3995d438b6187b5980a..749817b7c3873f66c42e75a2bdc9b6a5aad5d67f:/src/redis.c diff --git a/src/redis.c b/src/redis.c index 3294eea4..dacf4712 100644 --- a/src/redis.c +++ b/src/redis.c @@ -232,10 +232,8 @@ struct redisCommand redisCommandTable[] = { {"publish",publishCommand,3,"rpf",0,NULL,0,0,0,0,0}, {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0}, {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0}, - {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0}, {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0}, {"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0}, - {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0}, {"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0}, {"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0}, {"client",clientCommand,-2,"ar",0,NULL,0,0,0,0,0}, @@ -507,17 +505,6 @@ dictType keylistDictType = { dictListDestructor /* val destructor */ }; -/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to - * clusterNode structures. */ -dictType clusterNodesDictType = { - dictSdsHash, /* hash function */ - NULL, /* key dup */ - NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ - NULL /* val destructor */ -}; - int htNeedsResize(dict *dict) { long long size, used; @@ -641,6 +628,76 @@ long long getOperationsPerSecond(void) { return sum / REDIS_OPS_SEC_SAMPLES; } +void clientsCronHandleTimeout(redisClient *c) { + time_t now = server.unixtime; + + if (server.maxidletime && + !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ + !(c->flags & REDIS_MASTER) && /* no timeout for masters */ + !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */ + dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */ + listLength(c->pubsub_patterns) == 0 && + (now - c->lastinteraction > server.maxidletime)) + { + redisLog(REDIS_VERBOSE,"Closing idle client"); + freeClient(c); + } else if (c->flags & REDIS_BLOCKED) { + if (c->bpop.timeout != 0 && c->bpop.timeout < now) { + addReply(c,shared.nullmultibulk); + unblockClientWaitingData(c); + } + } +} + +/* The client query buffer is an sds.c string that can end with a lot of + * free space not used, this function reclaims space if needed. */ +void clientsCronResizeQueryBuffer(redisClient *c) { + size_t querybuf_size = sdsAllocSize(c->querybuf); + time_t idletime = server.unixtime - c->lastinteraction; + + /* There are two conditions to resize the query buffer: + * 1) Query buffer is > BIG_ARG and too big for latest peak. + * 2) Client is inactive and the buffer is bigger than 1k. */ + if (((querybuf_size > REDIS_MBULK_BIG_ARG) && + (querybuf_size/(c->querybuf_peak+1)) > 2) || + (querybuf_size > 1024 && idletime > 2)) + { + /* Only resize the query buffer if it is actually wasting space. */ + if (sdsavail(c->querybuf) > 1024) { + c->querybuf = sdsRemoveFreeSpace(c->querybuf); + } + } + /* Reset the peak again to capture the peak memory usage in the next + * cycle. */ + c->querybuf_peak = 0; +} + +void clientsCron(void) { + /* Make sure to process at least 1/100 of clients per call. + * Since this function is called 10 times per second we are sure that + * in the worst case we process all the clients in 10 seconds. + * In normal conditions (a reasonable number of clients) we process + * all the clients in a shorter time. */ + int numclients = listLength(server.clients); + int iterations = numclients/100; + + if (iterations < 50) + iterations = (numclients < 50) ? numclients : 50; + while(listLength(server.clients) && iterations--) { + redisClient *c; + listNode *head; + + /* Rotate the list, take the current head, process. + * This way if the client must be removed from the list it's the + * first element and we don't incur into O(N) computation. */ + listRotate(server.clients); + head = listFirst(server.clients); + c = listNodeValue(head); + clientsCronHandleTimeout(c); + clientsCronResizeQueryBuffer(c); + } +} + int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j, loops = server.cronloops; REDIS_NOTUSED(eventLoop); @@ -712,9 +769,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { zmalloc_used_memory()); } - /* Close connections of timedout clients */ - if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients) - closeTimedoutClients(); + /* We need to do a few operations on clients asynchronously. */ + clientsCron(); /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ @@ -792,9 +848,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * to detect transfer failures. */ if (!(loops % 10)) replicationCron(); - /* Run other sub-systems specific cron jobs */ - if (server.cluster_enabled && !(loops % 10)) clusterCron(); - server.cronloops++; return 100; } @@ -949,8 +1002,6 @@ void initServerConfig() { server.shutdown_asap = 0; server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD; server.repl_timeout = REDIS_REPL_TIMEOUT; - server.cluster_enabled = 0; - server.cluster.configfile = zstrdup("nodes.conf"); server.lua_caller = NULL; server.lua_time_limit = REDIS_LUA_TIME_LIMIT; server.lua_client = NULL; @@ -1151,7 +1202,6 @@ void initServer() { server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION; } - if (server.cluster_enabled) clusterInit(); scriptingInit(); slowlogInit(); bioInit(); @@ -1374,29 +1424,6 @@ int processCommand(redisClient *c) { return REDIS_OK; } - /* If cluster is enabled, redirect here */ - if (server.cluster_enabled && - !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { - int hashslot; - - if (server.cluster.state != REDIS_CLUSTER_OK) { - addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information"); - return REDIS_OK; - } else { - int ask; - clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&ask); - if (n == NULL) { - addReplyError(c,"Multi keys request invalid in cluster"); - return REDIS_OK; - } else if (n != server.cluster.myself) { - addReplySds(c,sdscatprintf(sdsempty(), - "-%s %d %s:%d\r\n", ask ? "ASK" : "MOVED", - hashslot,n->ip,n->port)); - return REDIS_OK; - } - } - } - /* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile @@ -1884,15 +1911,6 @@ sds genRedisInfoString(char *section) { } } - /* Clusetr */ - if (allsections || defsections || !strcasecmp(section,"cluster")) { - if (sections++) info = sdscat(info,"\r\n"); - info = sdscatprintf(info, - "# Cluster\r\n" - "cluster_enabled:%d\r\n", - server.cluster_enabled); - } - /* Key space */ if (allsections || defsections || !strcasecmp(section,"keyspace")) { if (sections++) info = sdscat(info,"\r\n"); @@ -2172,7 +2190,7 @@ void redisAsciiArt(void) { redisGitSHA1(), strtol(redisGitDirty(),NULL,10) > 0, (sizeof(long) == 8) ? "64" : "32", - server.cluster_enabled ? "cluster" : "stand alone", + "stand alone", server.port, (long) getpid() );