]> git.saurik.com Git - redis.git/blobdiff - src/networking.c
JSON support for Lua scripting, based on work from @lp, thanks!. We are using the...
[redis.git] / src / networking.c
index 7319bd90802fa5c1ac7bc6297f78895cd5edf807..7ed5c6fcf3ab4d64377eb81912aa6f5001f1791b 100644 (file)
@@ -14,14 +14,20 @@ redisClient *createClient(int fd) {
     redisClient *c = zmalloc(sizeof(redisClient));
     c->bufpos = 0;
 
-    anetNonBlock(NULL,fd);
-    anetTcpNoDelay(NULL,fd);
-    if (aeCreateFileEvent(server.el,fd,AE_READABLE,
-        readQueryFromClient, c) == AE_ERR)
-    {
-        close(fd);
-        zfree(c);
-        return NULL;
+    /* passing -1 as fd it is possible to create a non connected client.
+     * This is useful since all the Redis commands needs to be executed
+     * in the context of a client. When commands are executed in other
+     * contexts (for instance a Lua script) we need a non connected client. */
+    if (fd != -1) {
+        anetNonBlock(NULL,fd);
+        anetTcpNoDelay(NULL,fd);
+        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
+            readQueryFromClient, c) == AE_ERR)
+        {
+            close(fd);
+            zfree(c);
+            return NULL;
+        }
     }
 
     selectDb(c,0);
@@ -52,7 +58,7 @@ redisClient *createClient(int fd) {
     c->pubsub_patterns = listCreate();
     listSetFreeMethod(c->pubsub_patterns,decrRefCount);
     listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
-    listAddNodeTail(server.clients,c);
+    if (fd != -1) listAddNodeTail(server.clients,c);
     initClientMultiState(c);
     return c;
 }
@@ -60,6 +66,7 @@ redisClient *createClient(int fd) {
 /* Set the event loop to listen for write events on the client's socket.
  * Typically gets called every time a reply is built. */
 int _installWriteEvent(redisClient *c) {
+    if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
     if (c->fd <= 0) return REDIS_ERR;
     if (c->bufpos == 0 && listLength(c->reply) == 0 &&
         (c->replstate == REDIS_REPL_NONE ||
@@ -240,10 +247,17 @@ void addReplyError(redisClient *c, char *err) {
 }
 
 void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
+    size_t l, j;
     va_list ap;
     va_start(ap,fmt);
     sds s = sdscatvprintf(sdsempty(),fmt,ap);
     va_end(ap);
+    /* Make sure there are no newlines in the string, otherwise invalid protocol
+     * is emitted. */
+    l = sdslen(s);
+    for (j = 0; j < l; j++) {
+        if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
+    }
     _addReplyError(c,s,sdslen(s));
     sdsfree(s);
 }
@@ -596,7 +610,7 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
         }
     }
     if (totwritten > 0) c->lastinteraction = time(NULL);
-    if (listLength(c->reply) == 0) {
+    if (c->bufpos == 0 && listLength(c->reply) == 0) {
         c->sentlen = 0;
         aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
 
@@ -611,6 +625,8 @@ void resetClient(redisClient *c) {
     c->reqtype = 0;
     c->multibulklen = 0;
     c->bulklen = -1;
+    /* We clear the ASKING flag as well if we are not inside a MULTI. */
+    if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);
 }
 
 void closeTimedoutClients(void) {
@@ -689,7 +705,7 @@ int processMultibulkBuffer(redisClient *c) {
 
     if (c->multibulklen == 0) {
         /* The client should have been reset */
-        redisAssert(c->argc == 0);
+        redisAssertWithInfo(c,NULL,c->argc == 0);
 
         /* Multi bulk length cannot be read without a \r\n */
         newline = strchr(c->querybuf,'\r');
@@ -702,7 +718,7 @@ int processMultibulkBuffer(redisClient *c) {
 
         /* We know for sure there is a whole line since newline != NULL,
          * so go ahead and find out the multi bulk length. */
-        redisAssert(c->querybuf[0] == '*');
+        redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');
         ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
         if (!ok || ll > 1024*1024) {
             addReplyError(c,"Protocol error: invalid multibulk length");
@@ -723,7 +739,7 @@ int processMultibulkBuffer(redisClient *c) {
         c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
     }
 
-    redisAssert(c->multibulklen > 0);
+    redisAssertWithInfo(c,NULL,c->multibulklen > 0);
     while(c->multibulklen) {
         /* Read bulk length if unknown */
         if (c->bulklen == -1) {
@@ -779,6 +795,9 @@ int processMultibulkBuffer(redisClient *c) {
 void processInputBuffer(redisClient *c) {
     /* Keep processing while there is something in the input buffer */
     while(sdslen(c->querybuf)) {
+        /* Immediately abort if the client is in the middle of something. */
+        if (c->flags & REDIS_BLOCKED) return;
+
         /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
          * written to the client. Make sure to not let the reply grow after
          * this flag has been set (i.e. don't process more commands). */
@@ -927,6 +946,9 @@ void clientCommand(redisClient *c) {
     }
 }
 
+/* Rewrite the command vector of the client. All the new objects ref count
+ * is incremented. The old command vector is freed, and the old objects
+ * ref count is decremented. */
 void rewriteClientCommandVector(redisClient *c, int argc, ...) {
     va_list ap;
     int j;
@@ -950,6 +972,24 @@ void rewriteClientCommandVector(redisClient *c, int argc, ...) {
     c->argv = argv;
     c->argc = argc;
     c->cmd = lookupCommand(c->argv[0]->ptr);
-    redisAssert(c->cmd != NULL);
+    redisAssertWithInfo(c,NULL,c->cmd != NULL);
     va_end(ap);
 }
+
+/* Rewrite a single item in the command vector.
+ * The new val ref count is incremented, and the old decremented. */
+void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) {
+    robj *oldval;
+   
+    redisAssertWithInfo(c,NULL,i < c->argc);
+    oldval = c->argv[i];
+    c->argv[i] = newval;
+    incrRefCount(newval);
+    decrRefCount(oldval);
+
+    /* If this is the command name make sure to fix c->cmd. */
+    if (i == 0) {
+        c->cmd = lookupCommand(c->argv[0]->ptr);
+        redisAssertWithInfo(c,NULL,c->cmd != NULL);
+    }
+}