+/*
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
#include "redis.h"
#include <sys/uio.h>
static void setProtocolError(redisClient *c, int pos);
+/* To evaluate the output buffer size of a client we need to get size of
+ * allocated objects, however we can't used zmalloc_size() directly on sds
+ * strings because of the trick they use to work (the header is before the
+ * returned pointer), so we use this helper function. */
+size_t zmalloc_size_sds(sds s) {
+ return zmalloc_size(s-sizeof(struct sdshdr));
+}
+
void *dupClientReplyValue(void *o) {
incrRefCount((robj*)o);
return o;
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
- c->bufpos = 0;
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the Redis commands needs to be executed
selectDb(c,0);
c->fd = fd;
+ c->bufpos = 0;
c->querybuf = sdsempty();
+ c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
c->argv = NULL;
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
- c->lastinteraction = time(NULL);
+ c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
+ c->slave_listening_port = 0;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
- c->bpop.keys = NULL;
- c->bpop.count = 0;
+ c->bpop.keys = dictCreate(&setDictType,NULL);
c->bpop.timeout = 0;
c->bpop.target = NULL;
c->io_keys = listCreate();
if (listLength(c->reply) == 0) {
incrRefCount(o);
listAddNodeTail(c->reply,o);
+ c->reply_bytes += zmalloc_size_sds(o->ptr);
} else {
tail = listNodeValue(listLast(c->reply));
if (tail->ptr != NULL &&
sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
{
+ c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
+ c->reply_bytes += zmalloc_size_sds(tail->ptr);
} else {
incrRefCount(o);
listAddNodeTail(c->reply,o);
+ c->reply_bytes += zmalloc_size_sds(o->ptr);
}
}
- c->reply_bytes += sdslen(o->ptr);
asyncCloseClientOnOutputBufferLimitReached(c);
}
return;
}
- c->reply_bytes += sdslen(s);
if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
+ c->reply_bytes += zmalloc_size_sds(s);
} else {
tail = listNodeValue(listLast(c->reply));
if (tail->ptr != NULL &&
sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
{
+ c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
+ c->reply_bytes += zmalloc_size_sds(tail->ptr);
sdsfree(s);
} else {
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
+ c->reply_bytes += zmalloc_size_sds(s);
}
}
asyncCloseClientOnOutputBufferLimitReached(c);
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
if (listLength(c->reply) == 0) {
- listAddNodeTail(c->reply,createStringObject(s,len));
+ robj *o = createStringObject(s,len);
+
+ listAddNodeTail(c->reply,o);
+ c->reply_bytes += zmalloc_size_sds(o->ptr);
} else {
tail = listNodeValue(listLast(c->reply));
if (tail->ptr != NULL &&
sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
{
+ c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,s,len);
+ c->reply_bytes += zmalloc_size_sds(tail->ptr);
} else {
- listAddNodeTail(c->reply,createStringObject(s,len));
+ robj *o = createStringObject(s,len);
+
+ listAddNodeTail(c->reply,o);
+ c->reply_bytes += zmalloc_size_sds(o->ptr);
}
}
- c->reply_bytes += len;
asyncCloseClientOnOutputBufferLimitReached(c);
}
len = listNodeValue(ln);
len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
- c->reply_bytes += sdslen(len->ptr);
+ c->reply_bytes += zmalloc_size_sds(len->ptr);
if (ln->next != NULL) {
next = listNodeValue(ln->next);
/* Only glue when the next node is non-NULL (an sds in this case) */
if (next->ptr != NULL) {
+ c->reply_bytes -= zmalloc_size_sds(len->ptr);
+ c->reply_bytes -= zmalloc_size_sds(next->ptr);
len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
+ c->reply_bytes += zmalloc_size_sds(len->ptr);
listDelNode(c->reply,ln->next);
}
}
void addReplyLongLongWithPrefix(redisClient *c, long long ll, char prefix) {
char buf[128];
int len;
+
+ /* Things like $3\r\n or *2\r\n are emitted very often by the protocol
+ * so we have a few shared objects to use if the integer is small
+ * like it is most of the times. */
+ if (prefix == '*' && ll < REDIS_SHARED_BULKHDR_LEN) {
+ addReply(c,shared.mbulkhdr[ll]);
+ return;
+ } else if (prefix == '$' && ll < REDIS_SHARED_BULKHDR_LEN) {
+ addReply(c,shared.bulkhdr[ll]);
+ return;
+ }
+
buf[0] = prefix;
len = ll2string(buf+1,sizeof(buf)-1,ll);
buf[len+1] = '\r';
dst->reply_bytes = src->reply_bytes;
}
-static void acceptCommonHandler(int fd) {
+static void acceptCommonHandler(int fd, int flags) {
redisClient *c;
if ((c = createClient(fd)) == NULL) {
- redisLog(REDIS_WARNING,"Error allocating resoures for the client");
- close(fd); /* May be already closed, just ingore errors */
+ redisLog(REDIS_WARNING,"Error allocating resources for the client");
+ close(fd); /* May be already closed, just ignore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
return;
}
server.stat_numconnections++;
+ c->flags |= flags;
}
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
- acceptCommonHandler(cfd);
+ acceptCommonHandler(cfd,0);
}
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
- acceptCommonHandler(cfd);
+ acceptCommonHandler(cfd,REDIS_UNIX_SOCKET);
}
c->cmd = NULL;
}
+/* Close all the slaves connections. This is useful in chained replication
+ * when we resync with our own master and want to force all our slaves to
+ * resync with us as well. */
+void disconnectSlaves(void) {
+ while (listLength(server.slaves)) {
+ listNode *ln = listFirst(server.slaves);
+ freeClient((redisClient*)ln->value);
+ }
+}
+
void freeClient(redisClient *c) {
listNode *ln;
if (c->flags & REDIS_MASTER) {
server.master = NULL;
server.repl_state = REDIS_REPL_CONNECT;
- server.repl_down_since = time(NULL);
- /* Since we lost the connection with the master, we should also
- * close the connection with all our slaves if we have any, so
- * when we'll resync with the master the other slaves will sync again
- * with us as well. Note that also when the slave is not connected
- * to the master it will keep refusing connections by other slaves.
+ server.repl_down_since = server.unixtime;
+ /* We lost connection with our master, force our slaves to resync
+ * with us as well to load the new data set.
*
- * We do this only if server.masterhost != NULL. If it is NULL this
- * means the user called SLAVEOF NO ONE and we are freeing our
- * link with the master, so no need to close link with slaves. */
- if (server.masterhost != NULL) {
- while (listLength(server.slaves)) {
- ln = listFirst(server.slaves);
- freeClient((redisClient*)ln->value);
- }
- }
+ * If server.masterhost is NULL the user called SLAVEOF NO ONE so
+ * slave resync is not needed. */
+ if (server.masterhost != NULL) disconnectSlaves();
}
/* If this client was scheduled for async freeing we need to remove it
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen;
+ size_t objmem;
robj *o;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
} else {
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr);
+ objmem = zmalloc_size_sds(o->ptr);
if (objlen == 0) {
listDelNode(c->reply,listFirst(c->reply));
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
- c->reply_bytes -= objlen;
+ c->reply_bytes -= objmem;
}
}
- /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
+ /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
- * scenario think about 'KEYS *' against the loopback interfae) */
- if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
+ * scenario think about 'KEYS *' against the loopback interface).
+ *
+ * However if we are over the maxmemory limit we ignore that and
+ * just deliver as much data as it is possible to deliver. */
+ if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
+ (server.maxmemory == 0 ||
+ zmalloc_used_memory() < server.maxmemory)) break;
}
if (nwritten == -1) {
if (errno == EAGAIN) {
return;
}
}
- if (totwritten > 0) c->lastinteraction = time(NULL);
+ if (totwritten > 0) c->lastinteraction = server.unixtime;
if (c->bufpos == 0 && listLength(c->reply) == 0) {
c->sentlen = 0;
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);
}
-void closeTimedoutClients(void) {
- redisClient *c;
- listNode *ln;
- time_t now = time(NULL);
- listIter li;
-
- listRewind(server.clients,&li);
- while ((ln = listNext(&li)) != NULL) {
- c = listNodeValue(ln);
- if (server.maxidletime &&
- !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
- !(c->flags & REDIS_MASTER) && /* no timeout for masters */
- !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
- dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
- listLength(c->pubsub_patterns) == 0 &&
- (now - c->lastinteraction > server.maxidletime))
- {
- redisLog(REDIS_VERBOSE,"Closing idle client");
- freeClient(c);
- } else if (c->flags & REDIS_BLOCKED) {
- if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
- addReply(c,shared.nullmultibulk);
- unblockClientWaitingData(c);
- }
- }
- }
-}
-
int processInlineBuffer(redisClient *c) {
char *newline = strstr(c->querybuf,"\r\n");
int argc, j;
server.current_client = c;
readlen = REDIS_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
- * that is large enough, try to maximize the probabilty that the query
- * buffer contains excatly the SDS string representing the object, even
- * at the risk of requring more read(2) calls. This way the function
+ * that is large enough, try to maximize the probability that the query
+ * buffer contains exactly the SDS string representing the object, even
+ * at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
}
qblen = sdslen(c->querybuf);
+ if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
}
if (nread) {
sdsIncrLen(c->querybuf,nread);
- c->lastinteraction = time(NULL);
+ c->lastinteraction = server.unixtime;
} else {
server.current_client = NULL;
return;
/* Turn a Redis client into an sds string representing its state. */
sds getClientInfoString(redisClient *client) {
char ip[32], flags[16], events[3], *p;
- int port;
- time_t now = time(NULL);
+ int port = 0; /* initialized to zero for the unix socket case. */
int emask;
- if (anetPeerToString(client->fd,ip,&port) == -1) {
- ip[0] = '?';
- ip[1] = '\0';
- port = 0;
- }
+ if (!(client->flags & REDIS_UNIX_SOCKET))
+ anetPeerToString(client->fd,ip,&port);
p = flags;
if (client->flags & REDIS_SLAVE) {
if (client->flags & REDIS_MONITOR)
if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
if (client->flags & REDIS_CLOSE_ASAP) *p++ = 'A';
+ if (client->flags & REDIS_UNIX_SOCKET) *p++ = 'U';
if (p == flags) *p++ = 'N';
*p++ = '\0';
if (emask & AE_WRITABLE) *p++ = 'w';
*p = '\0';
return sdscatprintf(sdsempty(),
- "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu obl=%lu oll=%lu omem=%lu events=%s cmd=%s",
- ip,port,client->fd,
- (long)(now - client->lastinteraction),
+ "addr=%s:%d fd=%d age=%ld idle=%ld flags=%s db=%d sub=%d psub=%d multi=%d qbuf=%lu qbuf-free=%lu obl=%lu oll=%lu omem=%lu events=%s cmd=%s",
+ (client->flags & REDIS_UNIX_SOCKET) ? server.unixsocket : ip,
+ port,client->fd,
+ (long)(server.unixtime - client->ctime),
+ (long)(server.unixtime - client->lastinteraction),
flags,
client->db->id,
(int) dictSize(client->pubsub_channels),
(int) listLength(client->pubsub_patterns),
+ (client->flags & REDIS_MULTI) ? client->mstate.count : -1,
(unsigned long) sdslen(client->querybuf),
+ (unsigned long) sdsavail(client->querybuf),
(unsigned long) client->bufpos,
(unsigned long) listLength(client->reply),
getClientOutputBufferMemoryUsage(client),
*
* Note: this function is very fast so can be called as many time as
* the caller wishes. The main usage of this function currently is
- * enforcing the client output lenght limits. */
+ * enforcing the client output length limits. */
unsigned long getClientOutputBufferMemoryUsage(redisClient *c) {
- unsigned long list_item_size = sizeof(listNode);
+ unsigned long list_item_size = sizeof(listNode)+sizeof(robj);
return c->reply_bytes + (list_item_size*listLength(c->reply));
}
-/* Get the class of a client, used in order to envorce limits to different
+/* Get the class of a client, used in order to enforce limits to different
* classes of clients.
*
* The function will return one of the following:
return REDIS_CLIENT_LIMIT_CLASS_NORMAL;
}
+int getClientLimitClassByName(char *name) {
+ if (!strcasecmp(name,"normal")) return REDIS_CLIENT_LIMIT_CLASS_NORMAL;
+ else if (!strcasecmp(name,"slave")) return REDIS_CLIENT_LIMIT_CLASS_SLAVE;
+ else if (!strcasecmp(name,"pubsub")) return REDIS_CLIENT_LIMIT_CLASS_PUBSUB;
+ else return -1;
+}
+
+char *getClientLimitClassName(int class) {
+ switch(class) {
+ case REDIS_CLIENT_LIMIT_CLASS_NORMAL: return "normal";
+ case REDIS_CLIENT_LIMIT_CLASS_SLAVE: return "slave";
+ case REDIS_CLIENT_LIMIT_CLASS_PUBSUB: return "pubsub";
+ default: return NULL;
+ }
+}
+
/* The function checks if the client reached output buffer soft or hard
* limit, and also update the state needed to check the soft limit as
* a side effect.
* called from contexts where the client can't be freed safely, i.e. from the
* lower level functions pushing data inside the client output buffers. */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
- if (c->flags & REDIS_CLOSE_ASAP) return;
+ redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));
+ if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
if (checkClientOutputBufferLimits(c)) {
sds client = getClientInfoString(c);
freeClientAsync(c);
- redisLog(REDIS_NOTICE,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
+ redisLog(REDIS_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
sdsfree(client);
}
}
+
+/* Helper function used by freeMemoryIfNeeded() in order to flush slaves
+ * output buffers without returning control to the event loop. */
+void flushSlavesOutputBuffers(void) {
+ listIter li;
+ listNode *ln;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ redisClient *slave = listNodeValue(ln);
+ int events;
+
+ events = aeGetFileEvents(server.el,slave->fd);
+ if (events & AE_WRITABLE &&
+ slave->replstate == REDIS_REPL_ONLINE &&
+ listLength(slave->reply))
+ {
+ sendReplyToClient(server.el,slave->fd,slave,0);
+ }
+ }
+}