static void setProtocolError(redisClient *c, int pos);
+/* To evaluate the output buffer size of a client we need to get size of
+ * allocated objects, however we can't used zmalloc_size() directly on sds
+ * strings because of the trick they use to work (the header is before the
+ * returned pointer), so we use this helper function. */
+size_t zmalloc_size_sds(sds s) {
+ return zmalloc_size(s-sizeof(struct sdshdr));
+}
+
void *dupClientReplyValue(void *o) {
incrRefCount((robj*)o);
return o;
redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(redisClient));
- c->bufpos = 0;
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the Redis commands needs to be executed
selectDb(c,0);
c->fd = fd;
+ c->bufpos = 0;
c->querybuf = sdsempty();
c->reqtype = 0;
c->argc = 0;
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
- c->lastinteraction = time(NULL);
+ c->ctime = c->lastinteraction = time(NULL);
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
c->reply = listCreate();
if (listLength(c->reply) == 0) {
incrRefCount(o);
listAddNodeTail(c->reply,o);
+ c->reply_bytes += zmalloc_size_sds(o->ptr);
} else {
tail = listNodeValue(listLast(c->reply));
if (tail->ptr != NULL &&
sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
{
+ c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
+ c->reply_bytes += zmalloc_size_sds(tail->ptr);
} else {
incrRefCount(o);
listAddNodeTail(c->reply,o);
+ c->reply_bytes += zmalloc_size_sds(o->ptr);
}
}
- c->reply_bytes += sdslen(o->ptr);
asyncCloseClientOnOutputBufferLimitReached(c);
}
return;
}
- c->reply_bytes += sdslen(s);
if (listLength(c->reply) == 0) {
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
+ c->reply_bytes += zmalloc_size_sds(s);
} else {
tail = listNodeValue(listLast(c->reply));
if (tail->ptr != NULL &&
sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
{
+ c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
+ c->reply_bytes += zmalloc_size_sds(tail->ptr);
sdsfree(s);
} else {
listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
+ c->reply_bytes += zmalloc_size_sds(s);
}
}
asyncCloseClientOnOutputBufferLimitReached(c);
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
if (listLength(c->reply) == 0) {
- listAddNodeTail(c->reply,createStringObject(s,len));
+ robj *o = createStringObject(s,len);
+
+ listAddNodeTail(c->reply,o);
+ c->reply_bytes += zmalloc_size_sds(o->ptr);
} else {
tail = listNodeValue(listLast(c->reply));
if (tail->ptr != NULL &&
sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
{
+ c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,s,len);
+ c->reply_bytes += zmalloc_size_sds(tail->ptr);
} else {
- listAddNodeTail(c->reply,createStringObject(s,len));
+ robj *o = createStringObject(s,len);
+
+ listAddNodeTail(c->reply,o);
+ c->reply_bytes += zmalloc_size_sds(o->ptr);
}
}
- c->reply_bytes += len;
asyncCloseClientOnOutputBufferLimitReached(c);
}
len = listNodeValue(ln);
len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
- c->reply_bytes += sdslen(len->ptr);
+ c->reply_bytes += zmalloc_size_sds(len->ptr);
if (ln->next != NULL) {
next = listNodeValue(ln->next);
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen;
+ size_t objmem;
robj *o;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
} else {
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr);
+ objmem = zmalloc_size_sds(o->ptr);
if (objlen == 0) {
listDelNode(c->reply,listFirst(c->reply));
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
- c->reply_bytes -= objlen;
+ c->reply_bytes -= objmem;
}
}
/* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT
if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);
}
-void closeTimedoutClients(void) {
- redisClient *c;
- listNode *ln;
- time_t now = time(NULL);
- listIter li;
-
- listRewind(server.clients,&li);
- while ((ln = listNext(&li)) != NULL) {
- c = listNodeValue(ln);
- if (server.maxidletime &&
- !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
- !(c->flags & REDIS_MASTER) && /* no timeout for masters */
- !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
- dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
- listLength(c->pubsub_patterns) == 0 &&
- (now - c->lastinteraction > server.maxidletime))
- {
- redisLog(REDIS_VERBOSE,"Closing idle client");
- freeClient(c);
- } else if (c->flags & REDIS_BLOCKED) {
- if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
- addReply(c,shared.nullmultibulk);
- unblockClientWaitingData(c);
- }
- }
- }
-}
-
int processInlineBuffer(redisClient *c) {
char *newline = strstr(c->querybuf,"\r\n");
int argc, j;
time_t now = time(NULL);
int emask;
- if (anetPeerToString(client->fd,ip,&port) == -1) {
- ip[0] = '?';
- ip[1] = '\0';
- port = 0;
- }
+ anetPeerToString(client->fd,ip,&port);
p = flags;
if (client->flags & REDIS_SLAVE) {
if (client->flags & REDIS_MONITOR)
if (emask & AE_WRITABLE) *p++ = 'w';
*p = '\0';
return sdscatprintf(sdsempty(),
- "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu obl=%lu oll=%lu omem=%lu events=%s cmd=%s",
+ "addr=%s:%d fd=%d age=%ld idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu qbuf-free=%lu obl=%lu oll=%lu omem=%lu events=%s cmd=%s",
ip,port,client->fd,
+ (long)(now - client->ctime),
(long)(now - client->lastinteraction),
flags,
client->db->id,
(int) dictSize(client->pubsub_channels),
(int) listLength(client->pubsub_patterns),
(unsigned long) sdslen(client->querybuf),
+ (unsigned long) sdsavail(client->querybuf),
(unsigned long) client->bufpos,
(unsigned long) listLength(client->reply),
getClientOutputBufferMemoryUsage(client),
* the caller wishes. The main usage of this function currently is
* enforcing the client output length limits. */
unsigned long getClientOutputBufferMemoryUsage(redisClient *c) {
- unsigned long list_item_size = sizeof(listNode);
+ unsigned long list_item_size = sizeof(listNode)+sizeof(robj);
return c->reply_bytes + (list_item_size*listLength(c->reply));
}
* called from contexts where the client can't be freed safely, i.e. from the
* lower level functions pushing data inside the client output buffers. */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
- if (c->flags & REDIS_CLOSE_ASAP) return;
+ if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
if (checkClientOutputBufferLimits(c)) {
sds client = getClientInfoString(c);
sdsfree(client);
}
}
+
+/* Helper function used by freeMemoryIfNeeded() in order to flush slaves
+ * output buffers without returning control to the event loop. */
+void flushSlavesOutputBuffers(void) {
+ listIter li;
+ listNode *ln;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ redisClient *slave = listNodeValue(ln);
+ int events;
+
+ events = aeGetFileEvents(server.el,slave->fd);
+ if (events & AE_WRITABLE &&
+ slave->replstate == REDIS_REPL_ONLINE &&
+ listLength(slave->reply))
+ {
+ sendReplyToClient(server.el,slave->fd,slave,0);
+ }
+ }
+}