]> git.saurik.com Git - redis.git/commitdiff
Implementation of the internals that make possible to terminate clients overcoming...
authorantirez <antirez@gmail.com>
Mon, 23 Jan 2012 15:12:37 +0000 (16:12 +0100)
committerantirez <antirez@gmail.com>
Mon, 23 Jan 2012 15:12:37 +0000 (16:12 +0100)
src/aof.c
src/networking.c
src/redis.c
src/redis.h

index cfbc941d5fe292271e211b27ff519804efa5ffe6..72b476e7b1bf0ec7f1d1a54b8d39071ed35e1cbd 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
@@ -296,6 +296,7 @@ struct redisClient *createFakeClient(void) {
     c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
     c->reply = listCreate();
     c->reply_bytes = 0;
+    c->obuf_soft_limit_reached_time = 0;
     c->watched_keys = listCreate();
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
index 47fbcc58b6b452ff711bc772604c9614c4d90599..6c3e7d8148568b9ef379f78286085597c0a0f5db 100644 (file)
@@ -48,6 +48,7 @@ redisClient *createClient(int fd) {
     c->replstate = REDIS_REPL_NONE;
     c->reply = listCreate();
     c->reply_bytes = 0;
+    c->obuf_soft_limit_reached_time = 0;
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
     c->bpop.keys = NULL;
@@ -139,6 +140,7 @@ void _addReplyObjectToList(redisClient *c, robj *o) {
         }
     }
     c->reply_bytes += sdslen(o->ptr);
+    asyncCloseClientOnOutputBufferLimitReached(c);
 }
 
 /* This method takes responsibility over the sds. When it is no longer
@@ -168,6 +170,7 @@ void _addReplySdsToList(redisClient *c, sds s) {
             listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
         }
     }
+    asyncCloseClientOnOutputBufferLimitReached(c);
 }
 
 void _addReplyStringToList(redisClient *c, char *s, size_t len) {
@@ -191,6 +194,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
         }
     }
     c->reply_bytes += len;
+    asyncCloseClientOnOutputBufferLimitReached(c);
 }
 
 /* -----------------------------------------------------------------------------
@@ -318,6 +322,7 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
             listDelNode(c->reply,ln->next);
         }
     }
+    asyncCloseClientOnOutputBufferLimitReached(c);
 }
 
 /* Add a duble as a bulk reply */
@@ -558,12 +563,42 @@ void freeClient(redisClient *c) {
             }
         }
     }
+
+    /* If this client was scheduled for async freeing we need to remove it
+     * from the queue. */
+    if (c->flags & REDIS_CLOSE_ASAP) {
+        ln = listSearchKey(server.clients_to_close,c);
+        redisAssert(ln != NULL);
+        listDelNode(server.clients_to_close,ln);
+    }
+
     /* Release memory */
     zfree(c->argv);
     freeClientMultiState(c);
     zfree(c);
 }
 
+/* Schedule a client to free it at a safe time in the serverCron() function.
+ * This function is useful when we need to terminate a client but we are in
+ * a context where calling freeClient() is not possible, because the client
+ * should be valid for the continuation of the flow of the program. */
+void freeClientAsync(redisClient *c) {
+    if (c->flags & REDIS_CLOSE_ASAP) return;
+    c->flags |= REDIS_CLOSE_ASAP;
+    listAddNodeTail(server.clients_to_close,c);
+}
+
+void freeClientsInAsyncFreeQueue(void) {
+    while (listLength(server.clients_to_close)) {
+        listNode *ln = listFirst(server.clients_to_close);
+        redisClient *c = listNodeValue(ln);
+
+        c->flags &= ~REDIS_CLOSE_ASAP;
+        freeClient(c);
+        listDelNode(server.clients_to_close,ln);
+    }
+}
+
 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
     redisClient *c = privdata;
     int nwritten = 0, totwritten = 0, objlen;
@@ -1006,6 +1041,7 @@ sds getClientInfoString(redisClient *client) {
     if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
     if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
     if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
+    if (client->flags & REDIS_CLOSE_ASAP) *p++ = 'A';
     if (p == flags) *p++ = 'N';
     *p++ = '\0';
 
@@ -1164,3 +1200,63 @@ int getClientLimitClass(redisClient *c) {
         return REDIS_CLIENT_LIMIT_CLASS_PUBSUB;
     return REDIS_CLIENT_LIMIT_CLASS_NORMAL;
 }
+
+/* The function checks if the client reached output buffer soft or hard
+ * limit, and also update the state needed to check the soft limit as
+ * a side effect.
+ *
+ * Return value: non-zero if the client reached the soft or the hard limit.
+ *               Otherwise zero is returned. */
+int checkClientOutputBufferLimits(redisClient *c) {
+    int soft = 0, hard = 0, class;
+    unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
+
+    class = getClientLimitClass(c);
+    if (server.client_obuf_limits[class].hard_limit_bytes &&
+        used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
+        hard = 1;
+    if (server.client_obuf_limits[class].soft_limit_bytes &&
+        used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
+        soft = 1;
+
+    /* We need to check if the soft limit is reached continuously for the
+     * specified amount of seconds. */
+    if (soft) {
+        if (c->obuf_soft_limit_reached_time == 0) {
+            c->obuf_soft_limit_reached_time = server.unixtime;
+            soft = 0; /* First time we see the soft limit reached */
+        } else {
+            time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
+
+            if (elapsed <=
+                server.client_obuf_limits[class].soft_limit_seconds) {
+                soft = 0; /* The client still did not reached the max number of
+                             seconds for the soft limit to be considered
+                             reached. */
+            }
+        }
+    } else {
+        c->obuf_soft_limit_reached_time = 0;
+    }
+    return soft || hard;
+}
+
+/* Asynchronously close a client if soft or hard limit is reached on the
+ * output buffer size. If the client will be closed 1 is returend, otherwise 0
+ * is returned.
+ *
+ * Note: we need to close the client asynchronously because this function is
+ * called from contexts where the client can't be freed safely, i.e. from the
+ * lower level functions pushing data inside the client output buffers. */
+int asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
+    if (checkClientOutputBufferLimits(c)) {
+        sds client = getClientInfoString(c);
+
+        freeClientAsync(c);
+        redisLog(REDIS_NOTICE,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.");
+        sdsfree(client);
+        return 1;
+    } else {
+        return 0;
+    }
+}
index 17d7517f982cd0222d83ab6419dd72687ad4d82d..711cebe03b95005d1eb94cc4d60eb31925ad9527 100644 (file)
@@ -926,6 +926,17 @@ void initServerConfig() {
     server.repl_serve_stale_data = 1;
     server.repl_down_since = -1;
 
+    /* Client output buffer limits */
+    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
+    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_bytes = 0;
+    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_seconds = 0;
+    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].hard_limit_bytes = 0;
+    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_bytes = 1024*1024*256;
+    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_seconds = 60;
+    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].hard_limit_bytes = 1024*1024*256;
+    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_bytes = 1024*1024*32;
+    server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_seconds = 60;
+
     /* Double constants initialization */
     R_Zero = 0.0;
     R_PosInf = 1.0/R_Zero;
@@ -1002,6 +1013,7 @@ void initServer() {
 
     server.current_client = NULL;
     server.clients = listCreate();
+    server.clients_to_close = listCreate();
     server.slaves = listCreate();
     server.monitors = listCreate();
     server.unblocked_clients = listCreate();
index 8b262171c4e970df9b22292b4a85dee19b4fbd8e..dbb56ca3e24c920208ce1f79a913bfae5334bf03 100644 (file)
                                server.unblocked_clients */
 #define REDIS_LUA_CLIENT 512 /* This is a non connected client used by Lua */
 #define REDIS_ASKING 1024   /* Client issued the ASKING command */
+#define REDIS_CLOSE_ASAP 2048 /* Close this client ASAP */
 
 /* Client request types */
 #define REDIS_REQ_INLINE 1
 #define REDIS_CLIENT_LIMIT_CLASS_NORMAL 0
 #define REDIS_CLIENT_LIMIT_CLASS_SLAVE 1
 #define REDIS_CLIENT_LIMIT_CLASS_PUBSUB 2
+#define REDIS_CLIENT_LIMIT_NUM_CLASSES 3
 
 /* Slave replication state - slave side */
 #define REDIS_REPL_NONE 0 /* No active replication */
@@ -315,6 +317,7 @@ typedef struct redisClient {
     unsigned long reply_bytes; /* Tot bytes of objects in reply list */
     int sentlen;
     time_t lastinteraction; /* time of the last interaction, used for timeout */
+    time_t obuf_soft_limit_reached_time;
     int flags;              /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
     int slaveseldb;         /* slave selected db, if this client is a slave */
     int authenticated;      /* when requirepass is non-NULL */
@@ -374,6 +377,12 @@ typedef struct zset {
     zskiplist *zsl;
 } zset;
 
+typedef struct clientBufferLimitsConfig {
+    unsigned long hard_limit_bytes;
+    unsigned long soft_limit_bytes;
+    time_t soft_limit_seconds;
+} clientBufferLimitsConfig;
+
 /*-----------------------------------------------------------------------------
  * Redis cluster data structures
  *----------------------------------------------------------------------------*/
@@ -526,6 +535,7 @@ struct redisServer {
     int sofd;                   /* Unix socket file descriptor */
     int cfd;                    /* Cluster bus lisetning socket */
     list *clients;              /* List of active clients */
+    list *clients_to_close;     /* Clients to close asynchronously */
     list *slaves, *monitors;    /* List of slaves and MONITORs */
     redisClient *current_client; /* Current client, only used on crash report */
     char neterr[ANET_ERR_LEN];  /* Error buffer for anet.c */
@@ -559,6 +569,7 @@ struct redisServer {
     size_t client_max_querybuf_len; /* Limit for client query buffer length */
     int dbnum;                      /* Total number of configured DBs */
     int daemonize;                  /* True if running as a daemon */
+    clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES];
     /* AOF persistence */
     int aof_state;                  /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */
     int aof_fsync;                  /* Kind of fsync() policy */
@@ -792,6 +803,8 @@ sds getAllClientsInfoString(void);
 void rewriteClientCommandVector(redisClient *c, int argc, ...);
 void rewriteClientCommandArgument(redisClient *c, int i, robj *newval);
 unsigned long getClientOutputBufferMemoryUsage(redisClient *c);
+void freeClientsInAsyncFreeQueue(void);
+int asyncCloseClientOnOutputBufferLimitReached(redisClient *c);
 
 #ifdef __GNUC__
 void addReplyErrorFormat(redisClient *c, const char *fmt, ...)