c->replstate = REDIS_REPL_NONE;
c->reply = listCreate();
c->reply_bytes = 0;
+ c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
c->bpop.keys = NULL;
}
}
c->reply_bytes += sdslen(o->ptr);
+ asyncCloseClientOnOutputBufferLimitReached(c);
}
/* This method takes responsibility over the sds. When it is no longer
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
}
}
+ asyncCloseClientOnOutputBufferLimitReached(c);
}
void _addReplyStringToList(redisClient *c, char *s, size_t len) {
}
}
c->reply_bytes += len;
+ asyncCloseClientOnOutputBufferLimitReached(c);
}
/* -----------------------------------------------------------------------------
listDelNode(c->reply,ln->next);
}
}
+ asyncCloseClientOnOutputBufferLimitReached(c);
}
/* Add a duble as a bulk reply */
}
}
}
+
+ /* If this client was scheduled for async freeing we need to remove it
+ * from the queue. */
+ if (c->flags & REDIS_CLOSE_ASAP) {
+ ln = listSearchKey(server.clients_to_close,c);
+ redisAssert(ln != NULL);
+ listDelNode(server.clients_to_close,ln);
+ }
+
/* Release memory */
zfree(c->argv);
freeClientMultiState(c);
zfree(c);
}
+/* Schedule a client to free it at a safe time in the serverCron() function.
+ * This function is useful when we need to terminate a client but we are in
+ * a context where calling freeClient() is not possible, because the client
+ * should be valid for the continuation of the flow of the program. */
+void freeClientAsync(redisClient *c) {
+ if (c->flags & REDIS_CLOSE_ASAP) return;
+ c->flags |= REDIS_CLOSE_ASAP;
+ listAddNodeTail(server.clients_to_close,c);
+}
+
+void freeClientsInAsyncFreeQueue(void) {
+ while (listLength(server.clients_to_close)) {
+ listNode *ln = listFirst(server.clients_to_close);
+ redisClient *c = listNodeValue(ln);
+
+ c->flags &= ~REDIS_CLOSE_ASAP;
+ freeClient(c);
+ listDelNode(server.clients_to_close,ln);
+ }
+}
+
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen;
if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
+ if (client->flags & REDIS_CLOSE_ASAP) *p++ = 'A';
if (p == flags) *p++ = 'N';
*p++ = '\0';
return REDIS_CLIENT_LIMIT_CLASS_PUBSUB;
return REDIS_CLIENT_LIMIT_CLASS_NORMAL;
}
+
+/* The function checks if the client reached output buffer soft or hard
+ * limit, and also update the state needed to check the soft limit as
+ * a side effect.
+ *
+ * Return value: non-zero if the client reached the soft or the hard limit.
+ * Otherwise zero is returned. */
+int checkClientOutputBufferLimits(redisClient *c) {
+ int soft = 0, hard = 0, class;
+ unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
+
+ class = getClientLimitClass(c);
+ if (server.client_obuf_limits[class].hard_limit_bytes &&
+ used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
+ hard = 1;
+ if (server.client_obuf_limits[class].soft_limit_bytes &&
+ used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
+ soft = 1;
+
+ /* We need to check if the soft limit is reached continuously for the
+ * specified amount of seconds. */
+ if (soft) {
+ if (c->obuf_soft_limit_reached_time == 0) {
+ c->obuf_soft_limit_reached_time = server.unixtime;
+ soft = 0; /* First time we see the soft limit reached */
+ } else {
+ time_t elapsed = server.unixtime - c->obuf_soft_limit_reached_time;
+
+ if (elapsed <=
+ server.client_obuf_limits[class].soft_limit_seconds) {
+ soft = 0; /* The client still did not reached the max number of
+ seconds for the soft limit to be considered
+ reached. */
+ }
+ }
+ } else {
+ c->obuf_soft_limit_reached_time = 0;
+ }
+ return soft || hard;
+}
+
+/* Asynchronously close a client if soft or hard limit is reached on the
+ * output buffer size. If the client will be closed 1 is returend, otherwise 0
+ * is returned.
+ *
+ * Note: we need to close the client asynchronously because this function is
+ * called from contexts where the client can't be freed safely, i.e. from the
+ * lower level functions pushing data inside the client output buffers. */
+int asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
+ if (checkClientOutputBufferLimits(c)) {
+ sds client = getClientInfoString(c);
+
+ freeClientAsync(c);
+ redisLog(REDIS_NOTICE,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.");
+ sdsfree(client);
+ return 1;
+ } else {
+ return 0;
+ }
+}
server.repl_serve_stale_data = 1;
server.repl_down_since = -1;
+ /* Client output buffer limits */
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_bytes = 0;
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_seconds = 0;
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].hard_limit_bytes = 0;
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_bytes = 1024*1024*256;
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_seconds = 60;
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].hard_limit_bytes = 1024*1024*256;
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_bytes = 1024*1024*32;
+ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_seconds = 60;
+
/* Double constants initialization */
R_Zero = 0.0;
R_PosInf = 1.0/R_Zero;
server.current_client = NULL;
server.clients = listCreate();
+ server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.unblocked_clients = listCreate();
server.unblocked_clients */
#define REDIS_LUA_CLIENT 512 /* This is a non connected client used by Lua */
#define REDIS_ASKING 1024 /* Client issued the ASKING command */
+#define REDIS_CLOSE_ASAP 2048 /* Close this client ASAP */
/* Client request types */
#define REDIS_REQ_INLINE 1
#define REDIS_CLIENT_LIMIT_CLASS_NORMAL 0
#define REDIS_CLIENT_LIMIT_CLASS_SLAVE 1
#define REDIS_CLIENT_LIMIT_CLASS_PUBSUB 2
+#define REDIS_CLIENT_LIMIT_NUM_CLASSES 3
/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */
unsigned long reply_bytes; /* Tot bytes of objects in reply list */
int sentlen;
time_t lastinteraction; /* time of the last interaction, used for timeout */
+ time_t obuf_soft_limit_reached_time;
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
int slaveseldb; /* slave selected db, if this client is a slave */
int authenticated; /* when requirepass is non-NULL */
zskiplist *zsl;
} zset;
+typedef struct clientBufferLimitsConfig {
+ unsigned long hard_limit_bytes;
+ unsigned long soft_limit_bytes;
+ time_t soft_limit_seconds;
+} clientBufferLimitsConfig;
+
/*-----------------------------------------------------------------------------
* Redis cluster data structures
*----------------------------------------------------------------------------*/
int sofd; /* Unix socket file descriptor */
int cfd; /* Cluster bus lisetning socket */
list *clients; /* List of active clients */
+ list *clients_to_close; /* Clients to close asynchronously */
list *slaves, *monitors; /* List of slaves and MONITORs */
redisClient *current_client; /* Current client, only used on crash report */
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */
int daemonize; /* True if running as a daemon */
+ clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES];
/* AOF persistence */
int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync; /* Kind of fsync() policy */
void rewriteClientCommandVector(redisClient *c, int argc, ...);
void rewriteClientCommandArgument(redisClient *c, int i, robj *newval);
unsigned long getClientOutputBufferMemoryUsage(redisClient *c);
+void freeClientsInAsyncFreeQueue(void);
+int asyncCloseClientOnOutputBufferLimitReached(redisClient *c);
#ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)