redisClient *c = zmalloc(sizeof(redisClient));
c->bufpos = 0;
- anetNonBlock(NULL,fd);
- anetTcpNoDelay(NULL,fd);
- if (aeCreateFileEvent(server.el,fd,AE_READABLE,
- readQueryFromClient, c) == AE_ERR)
- {
- close(fd);
- zfree(c);
- return NULL;
+ /* passing -1 as fd it is possible to create a non connected client.
+ * This is useful since all the Redis commands needs to be executed
+ * in the context of a client. When commands are executed in other
+ * contexts (for instance a Lua script) we need a non connected client. */
+ if (fd != -1) {
+ anetNonBlock(NULL,fd);
+ anetTcpNoDelay(NULL,fd);
+ if (aeCreateFileEvent(server.el,fd,AE_READABLE,
+ readQueryFromClient, c) == AE_ERR)
+ {
+ close(fd);
+ zfree(c);
+ return NULL;
+ }
}
selectDb(c,0);
c->reqtype = 0;
c->argc = 0;
c->argv = NULL;
+ c->cmd = NULL;
c->multibulklen = 0;
c->bulklen = -1;
c->sentlen = 0;
c->pubsub_patterns = listCreate();
listSetFreeMethod(c->pubsub_patterns,decrRefCount);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
- listAddNodeTail(server.clients,c);
+ if (fd != -1) listAddNodeTail(server.clients,c);
initClientMultiState(c);
return c;
}
/* Set the event loop to listen for write events on the client's socket.
* Typically gets called every time a reply is built. */
int _installWriteEvent(redisClient *c) {
+ if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
if (c->fd <= 0) return REDIS_ERR;
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
}
void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
+ size_t l, j;
va_list ap;
va_start(ap,fmt);
sds s = sdscatvprintf(sdsempty(),fmt,ap);
va_end(ap);
+ /* Make sure there are no newlines in the string, otherwise invalid protocol
+ * is emitted. */
+ l = sdslen(s);
+ for (j = 0; j < l; j++) {
+ if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
+ }
_addReplyError(c,s,sdslen(s));
sdsfree(s);
}
cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
if (cfd == AE_ERR) {
- redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
+ redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
cfd = anetUnixAccept(server.neterr, fd);
if (cfd == AE_ERR) {
- redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
+ redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
for (j = 0; j < c->argc; j++)
decrRefCount(c->argv[j]);
c->argc = 0;
+ c->cmd = NULL;
}
void freeClient(redisClient *c) {
redisAssert(ln != NULL);
listDelNode(server.unblocked_clients,ln);
}
- /* Remove from the list of clients waiting for swapped keys, or ready
- * to be restarted, but not yet woken up again. */
- if (c->flags & REDIS_IO_WAIT) {
- redisAssert(server.ds_enabled);
- if (listLength(c->io_keys) == 0) {
- ln = listSearchKey(server.io_ready_clients,c);
-
- /* When this client is waiting to be woken up (REDIS_IO_WAIT),
- * it should be present in the list io_ready_clients */
- redisAssert(ln != NULL);
- listDelNode(server.io_ready_clients,ln);
- } else {
- while (listLength(c->io_keys)) {
- ln = listFirst(c->io_keys);
- dontWaitForSwappedKey(c,ln->value);
- }
- }
- server.cache_blocked_clients--;
- }
listRelease(c->io_keys);
/* Master/slave cleanup.
* Case 1: we lost the connection with a slave. */
if (c->flags & REDIS_MASTER) {
server.master = NULL;
server.replstate = REDIS_REPL_CONNECT;
+ server.repl_down_since = time(NULL);
/* Since we lost the connection with the master, we should also
* close the connection with all our slaves if we have any, so
* when we'll resync with the master the other slaves will sync again
}
}
if (totwritten > 0) c->lastinteraction = time(NULL);
- if (listLength(c->reply) == 0) {
+ if (c->bufpos == 0 && listLength(c->reply) == 0) {
c->sentlen = 0;
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
c->reqtype = 0;
c->multibulklen = 0;
c->bulklen = -1;
+ /* We clear the ASKING flag as well if we are not inside a MULTI. */
+ if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);
}
void closeTimedoutClients(void) {
if (c->multibulklen == 0) {
/* The client should have been reset */
- redisAssert(c->argc == 0);
+ redisAssertWithInfo(c,NULL,c->argc == 0);
/* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf,'\r');
if (newline == NULL)
return REDIS_ERR;
+ /* Buffer should also contain \n */
+ if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
+ return REDIS_ERR;
+
/* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */
- redisAssert(c->querybuf[0] == '*');
+ redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
if (!ok || ll > 1024*1024) {
addReplyError(c,"Protocol error: invalid multibulk length");
setProtocolError(c,pos);
return REDIS_ERR;
- } else {
- pos = (newline-c->querybuf)+2;
- if (ll <= 0) {
- c->querybuf = sdsrange(c->querybuf,pos,-1);
- return REDIS_OK;
- }
}
+
+ pos = (newline-c->querybuf)+2;
+ if (ll <= 0) {
+ c->querybuf = sdsrange(c->querybuf,pos,-1);
+ return REDIS_OK;
+ }
+
c->multibulklen = ll;
/* Setup argv array on client structure */
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
-
- /* Search new newline */
- newline = strchr(c->querybuf+pos,'\r');
}
- redisAssert(c->multibulklen > 0);
+ redisAssertWithInfo(c,NULL,c->multibulklen > 0);
while(c->multibulklen) {
/* Read bulk length if unknown */
if (c->bulklen == -1) {
newline = strchr(c->querybuf+pos,'\r');
- if (newline != NULL) {
- if (c->querybuf[pos] != '$') {
- addReplyErrorFormat(c,
- "Protocol error: expected '$', got '%c'",
- c->querybuf[pos]);
- setProtocolError(c,pos);
- return REDIS_ERR;
- }
+ if (newline == NULL)
+ break;
- ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
- if (!ok || ll < 0 || ll > 512*1024*1024) {
- addReplyError(c,"Protocol error: invalid bulk length");
- setProtocolError(c,pos);
- return REDIS_ERR;
- }
- pos += newline-(c->querybuf+pos)+2;
- c->bulklen = ll;
- } else {
- /* No newline in current buffer, so wait for more data */
+ /* Buffer should also contain \n */
+ if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
break;
+
+ if (c->querybuf[pos] != '$') {
+ addReplyErrorFormat(c,
+ "Protocol error: expected '$', got '%c'",
+ c->querybuf[pos]);
+ setProtocolError(c,pos);
+ return REDIS_ERR;
}
+
+ ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
+ if (!ok || ll < 0 || ll > 512*1024*1024) {
+ addReplyError(c,"Protocol error: invalid bulk length");
+ setProtocolError(c,pos);
+ return REDIS_ERR;
+ }
+
+ pos += newline-(c->querybuf+pos)+2;
+ c->bulklen = ll;
}
/* Read bulk argument */
/* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
- if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
+ if (c->flags & REDIS_BLOCKED) return;
/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
if (p == flags) *p++ = 'N';
if (client->flags & REDIS_MULTI) *p++ = 'x';
if (client->flags & REDIS_BLOCKED) *p++ = 'b';
- if (client->flags & REDIS_IO_WAIT) *p++ = 'i';
if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
addReplyError(c, "Syntax error, try CLIENT (LIST | KILL ip:port)");
}
}
+
+/* Rewrite the command vector of the client. All the new objects ref count
+ * is incremented. The old command vector is freed, and the old objects
+ * ref count is decremented. */
+void rewriteClientCommandVector(redisClient *c, int argc, ...) {
+ va_list ap;
+ int j;
+ robj **argv; /* The new argument vector */
+
+ argv = zmalloc(sizeof(robj*)*argc);
+ va_start(ap,argc);
+ for (j = 0; j < argc; j++) {
+ robj *a;
+
+ a = va_arg(ap, robj*);
+ argv[j] = a;
+ incrRefCount(a);
+ }
+ /* We free the objects in the original vector at the end, so we are
+ * sure that if the same objects are reused in the new vector the
+ * refcount gets incremented before it gets decremented. */
+ for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
+ zfree(c->argv);
+ /* Replace argv and argc with our new versions. */
+ c->argv = argv;
+ c->argc = argc;
+ c->cmd = lookupCommand(c->argv[0]->ptr);
+ redisAssertWithInfo(c,NULL,c->cmd != NULL);
+ va_end(ap);
+}
+
+/* Rewrite a single item in the command vector.
+ * The new val ref count is incremented, and the old decremented. */
+void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) {
+ robj *oldval;
+
+ redisAssertWithInfo(c,NULL,i < c->argc);
+ oldval = c->argv[i];
+ c->argv[i] = newval;
+ incrRefCount(newval);
+ decrRefCount(oldval);
+
+ /* If this is the command name make sure to fix c->cmd. */
+ if (i == 0) {
+ c->cmd = lookupCommand(c->argv[0]->ptr);
+ redisAssertWithInfo(c,NULL,c->cmd != NULL);
+ }
+}