]> git.saurik.com Git - redis.git/blobdiff - src/networking.c
version bumped to 2.9.3
[redis.git] / src / networking.c
index 8f2e6d8f49b7596d725c865c374435cbadd5a500..ddb171b32d470658558fdc0209e3d6c645000170 100644 (file)
@@ -1,6 +1,8 @@
 #include "redis.h"
 #include <sys/uio.h>
 
+static void setProtocolError(redisClient *c, int pos);
+
 void *dupClientReplyValue(void *o) {
     incrRefCount((robj*)o);
     return o;
@@ -14,14 +16,20 @@ redisClient *createClient(int fd) {
     redisClient *c = zmalloc(sizeof(redisClient));
     c->bufpos = 0;
 
-    anetNonBlock(NULL,fd);
-    anetTcpNoDelay(NULL,fd);
-    if (aeCreateFileEvent(server.el,fd,AE_READABLE,
-        readQueryFromClient, c) == AE_ERR)
-    {
-        close(fd);
-        zfree(c);
-        return NULL;
+    /* passing -1 as fd it is possible to create a non connected client.
+     * This is useful since all the Redis commands needs to be executed
+     * in the context of a client. When commands are executed in other
+     * contexts (for instance a Lua script) we need a non connected client. */
+    if (fd != -1) {
+        anetNonBlock(NULL,fd);
+        anetTcpNoDelay(NULL,fd);
+        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
+            readQueryFromClient, c) == AE_ERR)
+        {
+            close(fd);
+            zfree(c);
+            return NULL;
+        }
     }
 
     selectDb(c,0);
@@ -30,6 +38,7 @@ redisClient *createClient(int fd) {
     c->reqtype = 0;
     c->argc = 0;
     c->argv = NULL;
+    c->cmd = c->lastcmd = NULL;
     c->multibulklen = 0;
     c->bulklen = -1;
     c->sentlen = 0;
@@ -51,7 +60,7 @@ redisClient *createClient(int fd) {
     c->pubsub_patterns = listCreate();
     listSetFreeMethod(c->pubsub_patterns,decrRefCount);
     listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
-    listAddNodeTail(server.clients,c);
+    if (fd != -1) listAddNodeTail(server.clients,c);
     initClientMultiState(c);
     return c;
 }
@@ -59,6 +68,7 @@ redisClient *createClient(int fd) {
 /* Set the event loop to listen for write events on the client's socket.
  * Typically gets called every time a reply is built. */
 int _installWriteEvent(redisClient *c) {
+    if (c->flags & REDIS_LUA_CLIENT) return REDIS_OK;
     if (c->fd <= 0) return REDIS_ERR;
     if (c->bufpos == 0 && listLength(c->reply) == 0 &&
         (c->replstate == REDIS_REPL_NONE ||
@@ -239,10 +249,17 @@ void addReplyError(redisClient *c, char *err) {
 }
 
 void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
+    size_t l, j;
     va_list ap;
     va_start(ap,fmt);
     sds s = sdscatvprintf(sdsempty(),fmt,ap);
     va_end(ap);
+    /* Make sure there are no newlines in the string, otherwise invalid protocol
+     * is emitted. */
+    l = sdslen(s);
+    for (j = 0; j < l; j++) {
+        if (s[j] == '\r' || s[j] == '\n') s[j] = ' ';
+    }
     _addReplyError(c,s,sdslen(s));
     sdsfree(s);
 }
@@ -386,6 +403,16 @@ void addReplyBulkLongLong(redisClient *c, long long ll) {
     addReplyBulkCBuffer(c,buf,len);
 }
 
+/* Copy 'src' client output buffers into 'dst' client output buffers.
+ * The function takes care of freeing the old output buffers of the
+ * destination client. */
+void copyClientOutputBuffer(redisClient *dst, redisClient *src) {
+    listRelease(dst->reply);
+    dst->reply = listDup(src->reply);
+    memcpy(dst->buf,src->buf,src->bufpos);
+    dst->bufpos = src->bufpos;
+}
+
 static void acceptCommonHandler(int fd) {
     redisClient *c;
     if ((c = createClient(fd)) == NULL) {
@@ -397,13 +424,14 @@ static void acceptCommonHandler(int fd) {
      * connection. Note that we create the client instead to check before
      * for this condition, since now the socket is already set in nonblocking
      * mode and we can send an error for free using the Kernel I/O */
-    if (server.maxclients && listLength(server.clients) > server.maxclients) {
+    if (listLength(server.clients) > server.maxclients) {
         char *err = "-ERR max number of clients reached\r\n";
 
         /* That's a best effort error message, don't check write errors */
         if (write(c->fd,err,strlen(err)) == -1) {
             /* Nothing to do, Just to avoid the warning... */
         }
+        server.stat_rejected_conn++;
         freeClient(c);
         return;
     }
@@ -447,6 +475,7 @@ static void freeClientArgv(redisClient *c) {
     for (j = 0; j < c->argc; j++)
         decrRefCount(c->argv[j]);
     c->argc = 0;
+    c->cmd = NULL;
 }
 
 void freeClient(redisClient *c) {
@@ -487,25 +516,6 @@ void freeClient(redisClient *c) {
         redisAssert(ln != NULL);
         listDelNode(server.unblocked_clients,ln);
     }
-    /* Remove from the list of clients waiting for swapped keys, or ready
-     * to be restarted, but not yet woken up again. */
-    if (c->flags & REDIS_IO_WAIT) {
-        redisAssert(server.ds_enabled);
-        if (listLength(c->io_keys) == 0) {
-            ln = listSearchKey(server.io_ready_clients,c);
-
-            /* When this client is waiting to be woken up (REDIS_IO_WAIT),
-             * it should be present in the list io_ready_clients */
-            redisAssert(ln != NULL);
-            listDelNode(server.io_ready_clients,ln);
-        } else {
-            while (listLength(c->io_keys)) {
-                ln = listFirst(c->io_keys);
-                dontWaitForSwappedKey(c,ln->value);
-            }
-        }
-        server.cache_blocked_clients--;
-    }
     listRelease(c->io_keys);
     /* Master/slave cleanup.
      * Case 1: we lost the connection with a slave. */
@@ -521,7 +531,7 @@ void freeClient(redisClient *c) {
     /* Case 2: we lost the connection with the master. */
     if (c->flags & REDIS_MASTER) {
         server.master = NULL;
-        server.replstate = REDIS_REPL_CONNECT;
+        server.repl_state = REDIS_REPL_CONNECT;
         server.repl_down_since = time(NULL);
         /* Since we lost the connection with the master, we should also
          * close the connection with all our slaves if we have any, so
@@ -613,7 +623,7 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
         }
     }
     if (totwritten > 0) c->lastinteraction = time(NULL);
-    if (listLength(c->reply) == 0) {
+    if (c->bufpos == 0 && listLength(c->reply) == 0) {
         c->sentlen = 0;
         aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
 
@@ -628,6 +638,8 @@ void resetClient(redisClient *c) {
     c->reqtype = 0;
     c->multibulklen = 0;
     c->bulklen = -1;
+    /* We clear the ASKING flag as well if we are not inside a MULTI. */
+    if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);
 }
 
 void closeTimedoutClients(void) {
@@ -665,8 +677,13 @@ int processInlineBuffer(redisClient *c) {
     size_t querylen;
 
     /* Nothing to do without a \r\n */
-    if (newline == NULL)
+    if (newline == NULL) {
+        if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
+            addReplyError(c,"Protocol error: too big inline request");
+            setProtocolError(c,0);
+        }
         return REDIS_ERR;
+    }
 
     /* Split the input buffer up to the \r\n */
     querylen = newline-(c->querybuf);
@@ -695,6 +712,12 @@ int processInlineBuffer(redisClient *c) {
 /* Helper function. Trims query buffer to make the function that processes
  * multi bulk requests idempotent. */
 static void setProtocolError(redisClient *c, int pos) {
+    if (server.verbosity >= REDIS_VERBOSE) {
+        sds client = getClientInfoString(c);
+        redisLog(REDIS_VERBOSE,
+            "Protocol error from client: %s", client);
+        sdsfree(client);
+    }
     c->flags |= REDIS_CLOSE_AFTER_REPLY;
     c->querybuf = sdsrange(c->querybuf,pos,-1);
 }
@@ -706,12 +729,17 @@ int processMultibulkBuffer(redisClient *c) {
 
     if (c->multibulklen == 0) {
         /* The client should have been reset */
-        redisAssert(c->argc == 0);
+        redisAssertWithInfo(c,NULL,c->argc == 0);
 
         /* Multi bulk length cannot be read without a \r\n */
         newline = strchr(c->querybuf,'\r');
-        if (newline == NULL)
+        if (newline == NULL) {
+            if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
+                addReplyError(c,"Protocol error: too big mbulk count string");
+                setProtocolError(c,0);
+            }
             return REDIS_ERR;
+        }
 
         /* Buffer should also contain \n */
         if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
@@ -719,7 +747,7 @@ int processMultibulkBuffer(redisClient *c) {
 
         /* We know for sure there is a whole line since newline != NULL,
          * so go ahead and find out the multi bulk length. */
-        redisAssert(c->querybuf[0] == '*');
+        redisAssertWithInfo(c,NULL,c->querybuf[0] == '*');
         ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
         if (!ok || ll > 1024*1024) {
             addReplyError(c,"Protocol error: invalid multibulk length");
@@ -740,13 +768,18 @@ int processMultibulkBuffer(redisClient *c) {
         c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
     }
 
-    redisAssert(c->multibulklen > 0);
+    redisAssertWithInfo(c,NULL,c->multibulklen > 0);
     while(c->multibulklen) {
         /* Read bulk length if unknown */
         if (c->bulklen == -1) {
             newline = strchr(c->querybuf+pos,'\r');
-            if (newline == NULL)
+            if (newline == NULL) {
+                if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
+                    addReplyError(c,"Protocol error: too big bulk count string");
+                    setProtocolError(c,0);
+                }
                 break;
+            }
 
             /* Buffer should also contain \n */
             if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
@@ -768,6 +801,17 @@ int processMultibulkBuffer(redisClient *c) {
             }
 
             pos += newline-(c->querybuf+pos)+2;
+            if (ll >= REDIS_MBULK_BIG_ARG) {
+                /* If we are going to read a large object from network
+                 * try to make it likely that it will start at c->querybuf
+                 * boundary so that we can optimized object creation
+                 * avoiding a large copy of data. */
+                c->querybuf = sdsrange(c->querybuf,pos,-1);
+                pos = 0;
+                /* Hint the sds library about the amount of bytes this string is
+                 * going to contain. */
+                c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2);
+            }
             c->bulklen = ll;
         }
 
@@ -776,20 +820,37 @@ int processMultibulkBuffer(redisClient *c) {
             /* Not enough data (+2 == trailing \r\n) */
             break;
         } else {
-            c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen);
-            pos += c->bulklen+2;
+            /* Optimization: if the buffer contanins JUST our bulk element
+             * instead of creating a new object by *copying* the sds we
+             * just use the current sds string. */
+            if (pos == 0 &&
+                c->bulklen >= REDIS_MBULK_BIG_ARG &&
+                (signed) sdslen(c->querybuf) == c->bulklen+2)
+            {
+                c->argv[c->argc++] = createObject(REDIS_STRING,c->querybuf);
+                sdsIncrLen(c->querybuf,-2); /* remove CRLF */
+                c->querybuf = sdsempty();
+                /* Assume that if we saw a fat argument we'll see another one
+                 * likely... */
+                c->querybuf = sdsMakeRoomFor(c->querybuf,c->bulklen+2);
+                pos = 0;
+            } else {
+                c->argv[c->argc++] =
+                    createStringObject(c->querybuf+pos,c->bulklen);
+                pos += c->bulklen+2;
+            }
             c->bulklen = -1;
             c->multibulklen--;
         }
     }
 
     /* Trim to pos */
-    c->querybuf = sdsrange(c->querybuf,pos,-1);
+    if (pos) c->querybuf = sdsrange(c->querybuf,pos,-1);
 
     /* We're done when c->multibulk == 0 */
-    if (c->multibulklen == 0) {
-        return REDIS_OK;
-    }
+    if (c->multibulklen == 0) return REDIS_OK;
+
+    /* Still not read to process the command */
     return REDIS_ERR;
 }
 
@@ -797,7 +858,7 @@ void processInputBuffer(redisClient *c) {
     /* Keep processing while there is something in the input buffer */
     while(sdslen(c->querybuf)) {
         /* Immediately abort if the client is in the middle of something. */
-        if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
+        if (c->flags & REDIS_BLOCKED) return;
 
         /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
          * written to the client. Make sure to not let the reply grow after
@@ -834,12 +895,29 @@ void processInputBuffer(redisClient *c) {
 
 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
     redisClient *c = (redisClient*) privdata;
-    char buf[REDIS_IOBUF_LEN];
-    int nread;
+    int nread, readlen;
+    size_t qblen;
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(mask);
 
-    nread = read(fd, buf, REDIS_IOBUF_LEN);
+    readlen = REDIS_IOBUF_LEN;
+    /* If this is a multi bulk request, and we are processing a bulk reply
+     * that is large enough, try to maximize the probabilty that the query
+     * buffer contains excatly the SDS string representing the object, even
+     * at the risk of requring more read(2) calls. This way the function
+     * processMultiBulkBuffer() can avoid copying buffers to create the
+     * Redis Object representing the argument. */
+    if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
+        && c->bulklen >= REDIS_MBULK_BIG_ARG)
+    {
+        int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
+
+        if (remaining < readlen) readlen = remaining;
+    }
+
+    qblen = sdslen(c->querybuf);
+    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
+    nread = read(fd, c->querybuf+qblen, readlen);
     if (nread == -1) {
         if (errno == EAGAIN) {
             nread = 0;
@@ -854,11 +932,21 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
         return;
     }
     if (nread) {
-        c->querybuf = sdscatlen(c->querybuf,buf,nread);
+        sdsIncrLen(c->querybuf,nread);
         c->lastinteraction = time(NULL);
     } else {
         return;
     }
+    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
+        sds ci = getClientInfoString(c), bytes = sdsempty();
+
+        bytes = sdscatrepr(bytes,c->querybuf,64);
+        redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
+        sdsfree(ci);
+        sdsfree(bytes);
+        freeClient(c);
+        return;
+    }
     processInputBuffer(c);
 }
 
@@ -880,47 +968,80 @@ void getClientsMaxBuffers(unsigned long *longest_output_list,
     *biggest_input_buffer = bib;
 }
 
-void clientCommand(redisClient *c) {
+/* Turn a Redis client into an sds string representing its state. */
+sds getClientInfoString(redisClient *client) {
+    char ip[32], flags[16], events[3], *p;
+    int port;
+    time_t now = time(NULL);
+    int emask;
+
+    if (anetPeerToString(client->fd,ip,&port) == -1) {
+        ip[0] = '?';
+        ip[1] = '\0';
+        port = 0;
+    }
+    p = flags;
+    if (client->flags & REDIS_SLAVE) {
+        if (client->flags & REDIS_MONITOR)
+            *p++ = 'O';
+        else
+            *p++ = 'S';
+    }
+    if (client->flags & REDIS_MASTER) *p++ = 'M';
+    if (client->flags & REDIS_MULTI) *p++ = 'x';
+    if (client->flags & REDIS_BLOCKED) *p++ = 'b';
+    if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
+    if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
+    if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
+    if (p == flags) *p++ = 'N';
+    *p++ = '\0';
+
+    emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd);
+    p = events;
+    if (emask & AE_READABLE) *p++ = 'r';
+    if (emask & AE_WRITABLE) *p++ = 'w';
+    *p = '\0';
+    return sdscatprintf(sdsempty(),
+        "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu obl=%lu oll=%lu events=%s cmd=%s",
+        ip,port,client->fd,
+        (long)(now - client->lastinteraction),
+        flags,
+        client->db->id,
+        (int) dictSize(client->pubsub_channels),
+        (int) listLength(client->pubsub_patterns),
+        (unsigned long) sdslen(client->querybuf),
+        (unsigned long) client->bufpos,
+        (unsigned long) listLength(client->reply),
+        events,
+        client->lastcmd ? client->lastcmd->name : "NULL");
+}
+
+sds getAllClientsInfoString(void) {
     listNode *ln;
     listIter li;
     redisClient *client;
+    sds o = sdsempty();
 
-    if (!strcasecmp(c->argv[1]->ptr,"list") && c->argc == 2) {
-        sds o = sdsempty();
-        time_t now = time(NULL);
+    listRewind(server.clients,&li);
+    while ((ln = listNext(&li)) != NULL) {
+        sds cs;
 
-        listRewind(server.clients,&li);
-        while ((ln = listNext(&li)) != NULL) {
-            char ip[32], flags[16], *p;
-            int port;
+        client = listNodeValue(ln);
+        cs = getClientInfoString(client);
+        o = sdscatsds(o,cs);
+        sdsfree(cs);
+        o = sdscatlen(o,"\n",1);
+    }
+    return o;
+}
 
-            client = listNodeValue(ln);
-            if (anetPeerToString(client->fd,ip,&port) == -1) continue;
-            p = flags;
-            if (client->flags & REDIS_SLAVE) {
-                if (client->flags & REDIS_MONITOR)
-                    *p++ = 'O';
-                else
-                    *p++ = 'S';
-            }
-            if (client->flags & REDIS_MASTER) *p++ = 'M';
-            if (p == flags) *p++ = 'N';
-            if (client->flags & REDIS_MULTI) *p++ = 'x';
-            if (client->flags & REDIS_BLOCKED) *p++ = 'b';
-            if (client->flags & REDIS_IO_WAIT) *p++ = 'i';
-            if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd';
-            if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c';
-            if (client->flags & REDIS_UNBLOCKED) *p++ = 'u';
-            *p++ = '\0';
-            o = sdscatprintf(o,
-                "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d\n",
-                ip,port,client->fd,
-                (long)(now - client->lastinteraction),
-                flags,
-                client->db->id,
-                (int) dictSize(client->pubsub_channels),
-                (int) listLength(client->pubsub_patterns));
-        }
+void clientCommand(redisClient *c) {
+    listNode *ln;
+    listIter li;
+    redisClient *client;
+
+    if (!strcasecmp(c->argv[1]->ptr,"list") && c->argc == 2) {
+        sds o = getAllClientsInfoString();
         addReplyBulkCBuffer(c,o,sdslen(o));
         sdsfree(o);
     } else if (!strcasecmp(c->argv[1]->ptr,"kill") && c->argc == 3) {
@@ -948,6 +1069,9 @@ void clientCommand(redisClient *c) {
     }
 }
 
+/* Rewrite the command vector of the client. All the new objects ref count
+ * is incremented. The old command vector is freed, and the old objects
+ * ref count is decremented. */
 void rewriteClientCommandVector(redisClient *c, int argc, ...) {
     va_list ap;
     int j;
@@ -970,5 +1094,25 @@ void rewriteClientCommandVector(redisClient *c, int argc, ...) {
     /* Replace argv and argc with our new versions. */
     c->argv = argv;
     c->argc = argc;
+    c->cmd = lookupCommand(c->argv[0]->ptr);
+    redisAssertWithInfo(c,NULL,c->cmd != NULL);
     va_end(ap);
 }
+
+/* Rewrite a single item in the command vector.
+ * The new val ref count is incremented, and the old decremented. */
+void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) {
+    robj *oldval;
+   
+    redisAssertWithInfo(c,NULL,i < c->argc);
+    oldval = c->argv[i];
+    c->argv[i] = newval;
+    incrRefCount(newval);
+    decrRefCount(oldval);
+
+    /* If this is the command name make sure to fix c->cmd. */
+    if (i == 0) {
+        c->cmd = lookupCommand(c->argv[0]->ptr);
+        redisAssertWithInfo(c,NULL,c->cmd != NULL);
+    }
+}