X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/890da62eea16bbf67be0c92cf16eba191f8e828e..3c0602ff418ff1a1b18440a6994fd73226786251:/src/networking.c diff --git a/src/networking.c b/src/networking.c index 47fbcc58..6c3e7d81 100644 --- a/src/networking.c +++ b/src/networking.c @@ -48,6 +48,7 @@ redisClient *createClient(int fd) { c->replstate = REDIS_REPL_NONE; c->reply = listCreate(); c->reply_bytes = 0; + c->obuf_soft_limit_reached_time = 0; listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); c->bpop.keys = NULL; @@ -139,6 +140,7 @@ void _addReplyObjectToList(redisClient *c, robj *o) { } } c->reply_bytes += sdslen(o->ptr); + asyncCloseClientOnOutputBufferLimitReached(c); } /* This method takes responsibility over the sds. When it is no longer @@ -168,6 +170,7 @@ void _addReplySdsToList(redisClient *c, sds s) { listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); } } + asyncCloseClientOnOutputBufferLimitReached(c); } void _addReplyStringToList(redisClient *c, char *s, size_t len) { @@ -191,6 +194,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) { } } c->reply_bytes += len; + asyncCloseClientOnOutputBufferLimitReached(c); } /* ----------------------------------------------------------------------------- @@ -318,6 +322,7 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { listDelNode(c->reply,ln->next); } } + asyncCloseClientOnOutputBufferLimitReached(c); } /* Add a duble as a bulk reply */ @@ -558,12 +563,42 @@ void freeClient(redisClient *c) { } } } + + /* If this client was scheduled for async freeing we need to remove it + * from the queue. */ + if (c->flags & REDIS_CLOSE_ASAP) { + ln = listSearchKey(server.clients_to_close,c); + redisAssert(ln != NULL); + listDelNode(server.clients_to_close,ln); + } + /* Release memory */ zfree(c->argv); freeClientMultiState(c); zfree(c); } +/* Schedule a client to free it at a safe time in the serverCron() function. + * This function is useful when we need to terminate a client but we are in + * a context where calling freeClient() is not possible, because the client + * should be valid for the continuation of the flow of the program. */ +void freeClientAsync(redisClient *c) { + if (c->flags & REDIS_CLOSE_ASAP) return; + c->flags |= REDIS_CLOSE_ASAP; + listAddNodeTail(server.clients_to_close,c); +} + +void freeClientsInAsyncFreeQueue(void) { + while (listLength(server.clients_to_close)) { + listNode *ln = listFirst(server.clients_to_close); + redisClient *c = listNodeValue(ln); + + c->flags &= ~REDIS_CLOSE_ASAP; + freeClient(c); + listDelNode(server.clients_to_close,ln); + } +} + void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = privdata; int nwritten = 0, totwritten = 0, objlen; @@ -1006,6 +1041,7 @@ sds getClientInfoString(redisClient *client) { if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd'; if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c'; if (client->flags & REDIS_UNBLOCKED) *p++ = 'u'; + if (client->flags & REDIS_CLOSE_ASAP) *p++ = 'A'; if (p == flags) *p++ = 'N'; *p++ = '\0'; @@ -1164,3 +1200,63 @@ int getClientLimitClass(redisClient *c) { return REDIS_CLIENT_LIMIT_CLASS_PUBSUB; return REDIS_CLIENT_LIMIT_CLASS_NORMAL; } + +/* The function checks if the client reached output buffer soft or hard + * limit, and also update the state needed to check the soft limit as + * a side effect. + * + * Return value: non-zero if the client reached the soft or the hard limit. + * Otherwise zero is returned. */ +int checkClientOutputBufferLimits(redisClient *c) { + int soft = 0, hard = 0, class; + unsigned long used_mem = getClientOutputBufferMemoryUsage(c); + + class = getClientLimitClass(c); + if (server.client_obuf_limits[class].hard_limit_bytes && + used_mem >= server.client_obuf_limits[class].hard_limit_bytes) + hard = 1; + if (server.client_obuf_limits[class].soft_limit_bytes && + used_mem >= server.client_obuf_limits[class].soft_limit_bytes) + soft = 1; + + /* We need to check if the soft limit is reached continuously for the + * specified amount of seconds. */ + if (soft) { + if (c->obuf_soft_limit_reached_time == 0) { + c->obuf_soft_limit_reached_time = server.unixtime; + soft = 0; /* First time we see the soft limit reached */ + } else { + time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time; + + if (elapsed <= + server.client_obuf_limits[class].soft_limit_seconds) { + soft = 0; /* The client still did not reached the max number of + seconds for the soft limit to be considered + reached. */ + } + } + } else { + c->obuf_soft_limit_reached_time = 0; + } + return soft || hard; +} + +/* Asynchronously close a client if soft or hard limit is reached on the + * output buffer size. If the client will be closed 1 is returend, otherwise 0 + * is returned. + * + * Note: we need to close the client asynchronously because this function is + * called from contexts where the client can't be freed safely, i.e. from the + * lower level functions pushing data inside the client output buffers. */ +int asyncCloseClientOnOutputBufferLimitReached(redisClient *c) { + if (checkClientOutputBufferLimits(c)) { + sds client = getClientInfoString(c); + + freeClientAsync(c); + redisLog(REDIS_NOTICE,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits."); + sdsfree(client); + return 1; + } else { + return 0; + } +}