redisClient *c = zmalloc(sizeof(redisClient));
c->bufpos = 0;
- anetNonBlock(NULL,fd);
- anetTcpNoDelay(NULL,fd);
- if (aeCreateFileEvent(server.el,fd,AE_READABLE,
- readQueryFromClient, c) == AE_ERR)
- {
- close(fd);
- zfree(c);
- return NULL;
+ /* passing -1 as fd it is possible to create a non connected client.
+ * This is useful since all the Redis commands needs to be executed
+ * in the context of a client. When commands are executed in other
+ * contexts (for instance a Lua script) we need a non connected client. */
+ if (fd != -1) {
+ anetNonBlock(NULL,fd);
+ anetTcpNoDelay(NULL,fd);
+ if (aeCreateFileEvent(server.el,fd,AE_READABLE,
+ readQueryFromClient, c) == AE_ERR)
+ {
+ close(fd);
+ zfree(c);
+ return NULL;
+ }
}
selectDb(c,0);
c->pubsub_patterns = listCreate();
listSetFreeMethod(c->pubsub_patterns,decrRefCount);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
- listAddNodeTail(server.clients,c);
+ if (fd != -1) listAddNodeTail(server.clients,c);
initClientMultiState(c);
return c;
}
/* Set the event loop to listen for write events on the client's socket.
* Typically gets called every time a reply is built. */
int _installWriteEvent(redisClient *c) {
+ if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
if (c->fd <= 0) return REDIS_ERR;
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
}
void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
+ size_t l, j;
va_list ap;
va_start(ap,fmt);
sds s = sdscatvprintf(sdsempty(),fmt,ap);
va_end(ap);
+ /* Make sure there are no newlines in the string, otherwise invalid protocol
+ * is emitted. */
+ l = sdslen(s);
+ for (j = 0; j < l; j++) {
+ if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
+ }
_addReplyError(c,s,sdslen(s));
sdsfree(s);
}
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in nonblocking
* mode and we can send an error for free using the Kernel I/O */
- if (server.maxclients && listLength(server.clients) > server.maxclients) {
+ if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
}
}
if (totwritten > 0) c->lastinteraction = time(NULL);
- if (listLength(c->reply) == 0) {
+ if (c->bufpos == 0 && listLength(c->reply) == 0) {
c->sentlen = 0;
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
c->reqtype = 0;
c->multibulklen = 0;
c->bulklen = -1;
+ /* We clear the ASKING flag as well if we are not inside a MULTI. */
+ if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);
}
void closeTimedoutClients(void) {
if (c->multibulklen == 0) {
/* The client should have been reset */
- redisAssert(c->argc == 0);
+ redisAssertWithInfo(c,NULL,c->argc == 0);
/* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf,'\r');
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
- redisAssert(c->querybuf[0] == '*');
+ redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
if (!ok || ll > 1024*1024) {
addReplyError(c,"Protocol error: invalid multibulk length");
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
}
- redisAssert(c->multibulklen > 0);
+ redisAssertWithInfo(c,NULL,c->multibulklen > 0);
while(c->multibulklen) {
/* Read bulk length if unknown */
if (c->bulklen == -1) {
}
pos += newline-(c->querybuf+pos)+2;
+ if (ll >= REDIS_MBULK_BIG_ARG) {
+ /* If we are going to read a large object from network
+ * try to make it likely that it will start at c->querybuf
+ * boundary so that we can optimized object creation
+ * avoiding a large copy of data. */
+ c->querybuf = sdsrange(c->querybuf,pos,-1);
+ pos = 0;
+ /* Hint the sds library about the amount of bytes this string is
+ * going to contain. */
+ c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2);
+ }
c->bulklen = ll;
}
/* Not enough data (+2 == trailing \r\n) */
break;
} else {
- c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen);
- pos += c->bulklen+2;
+ /* Optimization: if the buffer contanins JUST our bulk element
+ * instead of creating a new object by *copying* the sds we
+ * just use the current sds string. */
+ if (pos == 0 &&
+ c->bulklen >= REDIS_MBULK_BIG_ARG &&
+ (signed) sdslen(c->querybuf) == c->bulklen+2)
+ {
+ c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);
+ sdsIncrLen(c->querybuf,-2); /* remove CRLF */
+ c->querybuf = sdsempty();
+ /* Assume that if we saw a fat argument we'll see another one
+ * likely... */
+ c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);
+ pos = 0;
+ } else {
+ c->argv[c->argc++] =
+ createStringObject(c->querybuf+pos,c->bulklen);
+ pos += c->bulklen+2;
+ }
c->bulklen = -1;
c->multibulklen--;
}
}
/* Trim to pos */
- c->querybuf = sdsrange(c->querybuf,pos,-1);
+ if (pos) c->querybuf = sdsrange(c->querybuf,pos,-1);
/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) {
void processInputBuffer(redisClient *c) {
/* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) {
+ /* Immediately abort if the client is in the middle of something. */
+ if (c->flags & REDIS_BLOCKED) return;
+
/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands). */
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
- char buf[REDIS_IOBUF_LEN];
- int nread;
+ int nread, readlen;
+ size_t qblen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
- nread = read(fd, buf, REDIS_IOBUF_LEN);
+ readlen = REDIS_IOBUF_LEN;
+ /* If this is a multi bulk request, and we are processing a bulk reply
+ * that is large enough, try to maximize the probabilty that the query
+ * buffer contains excatly the SDS string representing the object, even
+ * at the risk of requring more read(2) calls. This way the function
+ * processMultiBulkBuffer() can avoid copying buffers to create the
+ * Redis Object representing the argument. */
+ if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
+ && c->bulklen >= REDIS_MBULK_BIG_ARG)
+ {
+ int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
+
+ if (remaining < readlen) readlen = remaining;
+ }
+
+ qblen = sdslen(c->querybuf);
+ c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
+ nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
return;
}
if (nread) {
- c->querybuf = sdscatlen(c->querybuf,buf,nread);
+ sdsIncrLen(c->querybuf,nread);
c->lastinteraction = time(NULL);
} else {
return;
}
+ if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
+ sds ci = getClientInfoString(c);
+ redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s", ci);
+ sdsfree(ci);
+ freeClient(c);
+ return;
+ }
processInputBuffer(c);
}
*biggest_input_buffer = bib;
}
+/* Turn a Redis client into an sds string representing its state. */
+sds getClientInfoString(redisClient *client) {
+ char ip[32], flags[16], events[3], *p;
+ int port;
+ time_t now = time(NULL);
+ int emask;
+
+ if (anetPeerToString(client->fd,ip,&port) == -1) {
+ ip[0] = '?';
+ ip[1] = '\0';
+ port = 0;
+ }
+ p = flags;
+ if (client->flags & REDIS_SLAVE) {
+ if (client->flags & REDIS_MONITOR)
+ *p++ = 'O';
+ else
+ *p++ = 'S';
+ }
+ if (client->flags & REDIS_MASTER) *p++ = 'M';
+ if (p == flags) *p++ = 'N';
+ if (client->flags & REDIS_MULTI) *p++ = 'x';
+ if (client->flags & REDIS_BLOCKED) *p++ = 'b';
+ if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
+ if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
+ if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
+ *p++ = '\0';
+
+ emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd);
+ p = events;
+ if (emask & AE_READABLE) *p++ = 'r';
+ if (emask & AE_WRITABLE) *p++ = 'w';
+ *p = '\0';
+ return sdscatprintf(sdsempty(),
+ "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu obl=%lu oll=%lu events=%s",
+ ip,port,client->fd,
+ (long)(now - client->lastinteraction),
+ flags,
+ client->db->id,
+ (int) dictSize(client->pubsub_channels),
+ (int) listLength(client->pubsub_patterns),
+ (unsigned long) sdslen(client->querybuf),
+ (unsigned long) client->bufpos,
+ (unsigned long) listLength(client->reply),
+ events);
+}
+
void clientCommand(redisClient *c) {
listNode *ln;
listIter li;
if (!strcasecmp(c->argv[1]->ptr,"list") && c->argc == 2) {
sds o = sdsempty();
- time_t now = time(NULL);
listRewind(server.clients,&li);
while ((ln = listNext(&li)) != NULL) {
- char ip[32], flags[16], *p;
- int port;
-
client = listNodeValue(ln);
- if (anetPeerToString(client->fd,ip,&port) == -1) continue;
- p = flags;
- if (client->flags & REDIS_SLAVE) {
- if (client->flags & REDIS_MONITOR)
- *p++ = 'O';
- else
- *p++ = 'S';
- }
- if (client->flags & REDIS_MASTER) *p++ = 'M';
- if (p == flags) *p++ = 'N';
- if (client->flags & REDIS_MULTI) *p++ = 'x';
- if (client->flags & REDIS_BLOCKED) *p++ = 'b';
- if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
- if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
- if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
- *p++ = '\0';
- o = sdscatprintf(o,
- "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d\n",
- ip,port,client->fd,
- (long)(now - client->lastinteraction),
- flags,
- client->db->id,
- (int) dictSize(client->pubsub_channels),
- (int) listLength(client->pubsub_patterns));
+ o = sdscatsds(o,getClientInfoString(client));
+ o = sdscatlen(o,"\n",1);
}
addReplyBulkCBuffer(c,o,sdslen(o));
sdsfree(o);
}
}
+/* Rewrite the command vector of the client. All the new objects ref count
+ * is incremented. The old command vector is freed, and the old objects
+ * ref count is decremented. */
void rewriteClientCommandVector(redisClient *c, int argc, ...) {
va_list ap;
int j;
c->argv = argv;
c->argc = argc;
c->cmd = lookupCommand(c->argv[0]->ptr);
- redisAssert(c->cmd != NULL);
+ redisAssertWithInfo(c,NULL,c->cmd != NULL);
va_end(ap);
}
+
+/* Rewrite a single item in the command vector.
+ * The new val ref count is incremented, and the old decremented. */
+void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) {
+ robj *oldval;
+
+ redisAssertWithInfo(c,NULL,i < c->argc);
+ oldval = c->argv[i];
+ c->argv[i] = newval;
+ incrRefCount(newval);
+ decrRefCount(oldval);
+
+ /* If this is the command name make sure to fix c->cmd. */
+ if (i == 0) {
+ c->cmd = lookupCommand(c->argv[0]->ptr);
+ redisAssertWithInfo(c,NULL,c->cmd != NULL);
+ }
+}