c->fd = fd;
c->bufpos = 0;
c->querybuf = sdsempty();
+ c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
c->argv = NULL;
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
- c->lastinteraction = time(NULL);
+ c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
+ c->slave_listening_port = 0;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
/* Only glue when the next node is non-NULL (an sds in this case) */
if (next->ptr != NULL) {
+ c->reply_bytes -= zmalloc_size_sds(len->ptr);
+ c->reply_bytes -= zmalloc_size_sds(next->ptr);
len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
+ c->reply_bytes += zmalloc_size_sds(len->ptr);
listDelNode(c->reply,ln->next);
}
}
c->cmd = NULL;
}
+/* Close all the slaves connections. This is useful in chained replication
+ * when we resync with our own master and want to force all our slaves to
+ * resync with us as well. */
+void disconnectSlaves(void) {
+ while (listLength(server.slaves)) {
+ listNode *ln = listFirst(server.slaves);
+ freeClient((redisClient*)ln->value);
+ }
+}
+
void freeClient(redisClient *c) {
listNode *ln;
if (c->flags & REDIS_MASTER) {
server.master = NULL;
server.repl_state = REDIS_REPL_CONNECT;
- server.repl_down_since = time(NULL);
- /* Since we lost the connection with the master, we should also
- * close the connection with all our slaves if we have any, so
- * when we'll resync with the master the other slaves will sync again
- * with us as well. Note that also when the slave is not connected
- * to the master it will keep refusing connections by other slaves.
+ server.repl_down_since = server.unixtime;
+ /* We lost connection with our master, force our slaves to resync
+ * with us as well to load the new data set.
*
- * We do this only if server.masterhost != NULL. If it is NULL this
- * means the user called SLAVEOF NO ONE and we are freeing our
- * link with the master, so no need to close link with slaves. */
- if (server.masterhost != NULL) {
- while (listLength(server.slaves)) {
- ln = listFirst(server.slaves);
- freeClient((redisClient*)ln->value);
- }
- }
+ * If server.masterhost is NULL the user called SLAVEOF NO ONE so
+ * slave resync is not needed. */
+ if (server.masterhost != NULL) disconnectSlaves();
}
/* If this client was scheduled for async freeing we need to remove it
return;
}
}
- if (totwritten > 0) c->lastinteraction = time(NULL);
+ if (totwritten > 0) c->lastinteraction = server.unixtime;
if (c->bufpos == 0 && listLength(c->reply) == 0) {
c->sentlen = 0;
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);
}
-void closeTimedoutClients(void) {
- redisClient *c;
- listNode *ln;
- time_t now = time(NULL);
- listIter li;
-
- listRewind(server.clients,&li);
- while ((ln = listNext(&li)) != NULL) {
- c = listNodeValue(ln);
- if (server.maxidletime &&
- !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
- !(c->flags & REDIS_MASTER) && /* no timeout for masters */
- !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
- dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
- listLength(c->pubsub_patterns) == 0 &&
- (now - c->lastinteraction > server.maxidletime))
- {
- redisLog(REDIS_VERBOSE,"Closing idle client");
- freeClient(c);
- } else if (c->flags & REDIS_BLOCKED) {
- if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
- addReply(c,shared.nullmultibulk);
- unblockClientWaitingData(c);
- }
- }
- }
-}
-
int processInlineBuffer(redisClient *c) {
char *newline = strstr(c->querybuf,"\r\n");
int argc, j;
}
qblen = sdslen(c->querybuf);
+ if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
}
if (nread) {
sdsIncrLen(c->querybuf,nread);
- c->lastinteraction = time(NULL);
+ c->lastinteraction = server.unixtime;
} else {
server.current_client = NULL;
return;
sds getClientInfoString(redisClient *client) {
char ip[32], flags[16], events[3], *p;
int port;
- time_t now = time(NULL);
int emask;
anetPeerToString(client->fd,ip,&port);
if (emask & AE_WRITABLE) *p++ = 'w';
*p = '\0';
return sdscatprintf(sdsempty(),
- "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu obl=%lu oll=%lu omem=%lu events=%s cmd=%s",
+ "addr=%s:%d fd=%d age=%ld idle=%ld flags=%s db=%d sub=%d psub=%d multi=%d qbuf=%lu qbuf-free=%lu obl=%lu oll=%lu omem=%lu events=%s cmd=%s",
ip,port,client->fd,
- (long)(now - client->lastinteraction),
+ (long)(server.unixtime - client->ctime),
+ (long)(server.unixtime - client->lastinteraction),
flags,
client->db->id,
(int) dictSize(client->pubsub_channels),
(int) listLength(client->pubsub_patterns),
+ (client->flags & REDIS_MULTI) ? client->mstate.count : -1,
(unsigned long) sdslen(client->querybuf),
+ (unsigned long) sdsavail(client->querybuf),
(unsigned long) client->bufpos,
(unsigned long) listLength(client->reply),
getClientOutputBufferMemoryUsage(client),
* called from contexts where the client can't be freed safely, i.e. from the
* lower level functions pushing data inside the client output buffers. */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
+ redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));
if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
if (checkClientOutputBufferLimits(c)) {
sds client = getClientInfoString(c);