From: antirez Date: Mon, 23 Jan 2012 09:36:07 +0000 (+0100) Subject: Merge branch 'unstable' into limits X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/890da62eea16bbf67be0c92cf16eba191f8e828e?hp=b362c111daa0939f86123cb6fb82fbb389cffc7b Merge branch 'unstable' into limits --- diff --git a/src/aof.c b/src/aof.c index 25d6f454..cfbc941d 100644 --- a/src/aof.c +++ b/src/aof.c @@ -295,6 +295,7 @@ struct redisClient *createFakeClient(void) { * so that Redis will not try to send replies to this client. */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; c->reply = listCreate(); + c->reply_bytes = 0; c->watched_keys = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); diff --git a/src/networking.c b/src/networking.c index 8a025db3..47fbcc58 100644 --- a/src/networking.c +++ b/src/networking.c @@ -47,6 +47,7 @@ redisClient *createClient(int fd) { c->authenticated = 0; c->replstate = REDIS_REPL_NONE; c->reply = listCreate(); + c->reply_bytes = 0; listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); c->bpop.keys = NULL; @@ -137,6 +138,7 @@ void _addReplyObjectToList(redisClient *c, robj *o) { listAddNodeTail(c->reply,o); } } + c->reply_bytes += sdslen(o->ptr); } /* This method takes responsibility over the sds. When it is no longer @@ -149,6 +151,7 @@ void _addReplySdsToList(redisClient *c, sds s) { return; } + c->reply_bytes += sdslen(s); if (listLength(c->reply) == 0) { listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); } else { @@ -187,6 +190,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) { listAddNodeTail(c->reply,createStringObject(s,len)); } } + c->reply_bytes += len; } /* ----------------------------------------------------------------------------- @@ -304,6 +308,7 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { len = listNodeValue(ln); len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length); + c->reply_bytes += sdslen(len->ptr); if (ln->next != NULL) { next = listNodeValue(ln->next); @@ -411,6 +416,7 @@ void copyClientOutputBuffer(redisClient *dst, redisClient *src) { dst->reply = listDup(src->reply); memcpy(dst->buf,src->buf,src->bufpos); dst->bufpos = src->bufpos; + dst->reply_bytes = src->reply_bytes; } static void acceptCommonHandler(int fd) { @@ -606,6 +612,7 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { if (c->sentlen == objlen) { listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; + c->reply_bytes -= objlen; } } /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT @@ -1008,7 +1015,7 @@ sds getClientInfoString(redisClient *client) { if (emask & AE_WRITABLE) *p++ = 'w'; *p = '\0'; return sdscatprintf(sdsempty(), - "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu obl=%lu oll=%lu events=%s cmd=%s", + "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu obl=%lu oll=%lu omem=%lu events=%s cmd=%s", ip,port,client->fd, (long)(now - client->lastinteraction), flags, @@ -1018,6 +1025,7 @@ sds getClientInfoString(redisClient *client) { (unsigned long) sdslen(client->querybuf), (unsigned long) client->bufpos, (unsigned long) listLength(client->reply), + getClientOutputBufferMemoryUsage(client), events, client->lastcmd ? client->lastcmd->name : "NULL"); } @@ -1122,3 +1130,37 @@ void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) { redisAssertWithInfo(c,NULL,c->cmd != NULL); } } + +/* This function returns the number of bytes that Redis is virtually + * using to store the reply still not read by the client. + * It is "virtual" since the reply output list may contain objects that + * are shared and are not really using additional memory. + * + * The function returns the total sum of the length of all the objects + * stored in the output list, plus the memory used to allocate every + * list node. The static reply buffer is not taken into account since it + * is allocated anyway. + * + * Note: this function is very fast so can be called as many time as + * the caller wishes. The main usage of this function currently is + * enforcing the client output lenght limits. */ +unsigned long getClientOutputBufferMemoryUsage(redisClient *c) { + unsigned long list_item_size = sizeof(listNode); + + return c->reply_bytes + (list_item_size*listLength(c->reply)); +} + +/* Get the class of a client, used in order to envorce limits to different + * classes of clients. + * + * The function will return one of the following: + * REDIS_CLIENT_LIMIT_CLASS_NORMAL -> Normal client + * REDIS_CLIENT_LIMIT_CLASS_SLAVE -> Slave or client executing MONITOR command + * REDIS_CLIENT_LIMIT_CLASS_PUBSUB -> Client subscribed to Pub/Sub channels + */ +int getClientLimitClass(redisClient *c) { + if (c->flags & REDIS_SLAVE) return REDIS_CLIENT_LIMIT_CLASS_SLAVE; + if (dictSize(c->pubsub_channels) || listLength(c->pubsub_patterns)) + return REDIS_CLIENT_LIMIT_CLASS_PUBSUB; + return REDIS_CLIENT_LIMIT_CLASS_NORMAL; +} diff --git a/src/redis.h b/src/redis.h index 8889779a..8b262171 100644 --- a/src/redis.h +++ b/src/redis.h @@ -147,6 +147,12 @@ #define REDIS_REQ_INLINE 1 #define REDIS_REQ_MULTIBULK 2 +/* Client classes for client limits, currently used only for + * the max-client-output-buffer limit implementation. */ +#define REDIS_CLIENT_LIMIT_CLASS_NORMAL 0 +#define REDIS_CLIENT_LIMIT_CLASS_SLAVE 1 +#define REDIS_CLIENT_LIMIT_CLASS_PUBSUB 2 + /* Slave replication state - slave side */ #define REDIS_REPL_NONE 0 /* No active replication */ #define REDIS_REPL_CONNECT 1 /* Must connect to master */ @@ -306,6 +312,7 @@ typedef struct redisClient { int multibulklen; /* number of multi bulk arguments left to read */ long bulklen; /* length of bulk argument in multi bulk request */ list *reply; + unsigned long reply_bytes; /* Tot bytes of objects in reply list */ int sentlen; time_t lastinteraction; /* time of the last interaction, used for timeout */ int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */ @@ -784,6 +791,7 @@ sds getClientInfoString(redisClient *client); sds getAllClientsInfoString(void); void rewriteClientCommandVector(redisClient *c, int argc, ...); void rewriteClientCommandArgument(redisClient *c, int i, robj *newval); +unsigned long getClientOutputBufferMemoryUsage(redisClient *c); #ifdef __GNUC__ void addReplyErrorFormat(redisClient *c, const char *fmt, ...)