]> git.saurik.com Git - redis.git/blobdiff - src/networking.c
Track the length of the client pending output buffers (still to transfer) in a new...
[redis.git] / src / networking.c
index f0da4010fe43a109461217eefa2baa76db2ec311..61f849207f89fe60f37b0d6396147fa1a3f18a63 100644 (file)
@@ -1,6 +1,8 @@
 #include "redis.h"
 #include <sys/uio.h>
 
 #include "redis.h"
 #include <sys/uio.h>
 
+static void setProtocolError(redisClient *c, int pos);
+
 void *dupClientReplyValue(void *o) {
     incrRefCount((robj*)o);
     return o;
 void *dupClientReplyValue(void *o) {
     incrRefCount((robj*)o);
     return o;
@@ -45,6 +47,7 @@ redisClient *createClient(int fd) {
     c->authenticated = 0;
     c->replstate = REDIS_REPL_NONE;
     c->reply = listCreate();
     c->authenticated = 0;
     c->replstate = REDIS_REPL_NONE;
     c->reply = listCreate();
+    c->reply_bytes = 0;
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
     c->bpop.keys = NULL;
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
     c->bpop.keys = NULL;
@@ -135,6 +138,7 @@ void _addReplyObjectToList(redisClient *c, robj *o) {
             listAddNodeTail(c->reply,o);
         }
     }
             listAddNodeTail(c->reply,o);
         }
     }
+    c->reply_bytes += sdslen(o->ptr);
 }
 
 /* This method takes responsibility over the sds. When it is no longer
 }
 
 /* This method takes responsibility over the sds. When it is no longer
@@ -147,6 +151,7 @@ void _addReplySdsToList(redisClient *c, sds s) {
         return;
     }
 
         return;
     }
 
+    c->reply_bytes += sdslen(s);
     if (listLength(c->reply) == 0) {
         listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
     } else {
     if (listLength(c->reply) == 0) {
         listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
     } else {
@@ -185,6 +190,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
             listAddNodeTail(c->reply,createStringObject(s,len));
         }
     }
             listAddNodeTail(c->reply,createStringObject(s,len));
         }
     }
+    c->reply_bytes += len;
 }
 
 /* -----------------------------------------------------------------------------
 }
 
 /* -----------------------------------------------------------------------------
@@ -302,6 +308,7 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
 
     len = listNodeValue(ln);
     len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
 
     len = listNodeValue(ln);
     len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
+    c->reply_bytes += sdslen(len->ptr);
     if (ln->next != NULL) {
         next = listNodeValue(ln->next);
 
     if (ln->next != NULL) {
         next = listNodeValue(ln->next);
 
@@ -401,6 +408,17 @@ void addReplyBulkLongLong(redisClient *c, long long ll) {
     addReplyBulkCBuffer(c,buf,len);
 }
 
     addReplyBulkCBuffer(c,buf,len);
 }
 
+/* Copy 'src' client output buffers into 'dst' client output buffers.
+ * The function takes care of freeing the old output buffers of the
+ * destination client. */
+void copyClientOutputBuffer(redisClient *dst, redisClient *src) {
+    listRelease(dst->reply);
+    dst->reply = listDup(src->reply);
+    memcpy(dst->buf,src->buf,src->bufpos);
+    dst->bufpos = src->bufpos;
+    dst->reply_bytes = src->reply_bytes;
+}
+
 static void acceptCommonHandler(int fd) {
     redisClient *c;
     if ((c = createClient(fd)) == NULL) {
 static void acceptCommonHandler(int fd) {
     redisClient *c;
     if ((c = createClient(fd)) == NULL) {
@@ -469,6 +487,9 @@ static void freeClientArgv(redisClient *c) {
 void freeClient(redisClient *c) {
     listNode *ln;
 
 void freeClient(redisClient *c) {
     listNode *ln;
 
+    /* If this is marked as current client unset it */
+    if (server.current_client == c) server.current_client = NULL;
+
     /* Note that if the client we are freeing is blocked into a blocking
      * call, we have to set querybuf to NULL *before* to call
      * unblockClientWaitingData() to avoid processInputBuffer() will get
     /* Note that if the client we are freeing is blocked into a blocking
      * call, we have to set querybuf to NULL *before* to call
      * unblockClientWaitingData() to avoid processInputBuffer() will get
@@ -519,7 +540,7 @@ void freeClient(redisClient *c) {
     /* Case 2: we lost the connection with the master. */
     if (c->flags & REDIS_MASTER) {
         server.master = NULL;
     /* Case 2: we lost the connection with the master. */
     if (c->flags & REDIS_MASTER) {
         server.master = NULL;
-        server.replstate = REDIS_REPL_CONNECT;
+        server.repl_state = REDIS_REPL_CONNECT;
         server.repl_down_since = time(NULL);
         /* Since we lost the connection with the master, we should also
          * close the connection with all our slaves if we have any, so
         server.repl_down_since = time(NULL);
         /* Since we lost the connection with the master, we should also
          * close the connection with all our slaves if we have any, so
@@ -591,6 +612,7 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
             if (c->sentlen == objlen) {
                 listDelNode(c->reply,listFirst(c->reply));
                 c->sentlen = 0;
             if (c->sentlen == objlen) {
                 listDelNode(c->reply,listFirst(c->reply));
                 c->sentlen = 0;
+                c->reply_bytes -= objlen;
             }
         }
         /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
             }
         }
         /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
@@ -665,8 +687,13 @@ int processInlineBuffer(redisClient *c) {
     size_t querylen;
 
     /* Nothing to do without a \r\n */
     size_t querylen;
 
     /* Nothing to do without a \r\n */
-    if (newline == NULL)
+    if (newline == NULL) {
+        if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
+            addReplyError(c,"Protocol error: too big inline request");
+            setProtocolError(c,0);
+        }
         return REDIS_ERR;
         return REDIS_ERR;
+    }
 
     /* Split the input buffer up to the \r\n */
     querylen = newline-(c->querybuf);
 
     /* Split the input buffer up to the \r\n */
     querylen = newline-(c->querybuf);
@@ -716,8 +743,13 @@ int processMultibulkBuffer(redisClient *c) {
 
         /* Multi bulk length cannot be read without a \r\n */
         newline = strchr(c->querybuf,'\r');
 
         /* Multi bulk length cannot be read without a \r\n */
         newline = strchr(c->querybuf,'\r');
-        if (newline == NULL)
+        if (newline == NULL) {
+            if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
+                addReplyError(c,"Protocol error: too big mbulk count string");
+                setProtocolError(c,0);
+            }
             return REDIS_ERR;
             return REDIS_ERR;
+        }
 
         /* Buffer should also contain \n */
         if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
 
         /* Buffer should also contain \n */
         if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
@@ -751,8 +783,13 @@ int processMultibulkBuffer(redisClient *c) {
         /* Read bulk length if unknown */
         if (c->bulklen == -1) {
             newline = strchr(c->querybuf+pos,'\r');
         /* Read bulk length if unknown */
         if (c->bulklen == -1) {
             newline = strchr(c->querybuf+pos,'\r');
-            if (newline == NULL)
+            if (newline == NULL) {
+                if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
+                    addReplyError(c,"Protocol error: too big bulk count string");
+                    setProtocolError(c,0);
+                }
                 break;
                 break;
+            }
 
             /* Buffer should also contain \n */
             if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
 
             /* Buffer should also contain \n */
             if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
@@ -821,9 +858,9 @@ int processMultibulkBuffer(redisClient *c) {
     if (pos) c->querybuf = sdsrange(c->querybuf,pos,-1);
 
     /* We're done when c->multibulk == 0 */
     if (pos) c->querybuf = sdsrange(c->querybuf,pos,-1);
 
     /* We're done when c->multibulk == 0 */
-    if (c->multibulklen == 0) {
-        return REDIS_OK;
-    }
+    if (c->multibulklen == 0) return REDIS_OK;
+
+    /* Still not read to process the command */
     return REDIS_ERR;
 }
 
     return REDIS_ERR;
 }
 
@@ -873,6 +910,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(mask);
 
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(mask);
 
+    server.current_client = c;
     readlen = REDIS_IOBUF_LEN;
     /* If this is a multi bulk request, and we are processing a bulk reply
      * that is large enough, try to maximize the probabilty that the query
     readlen = REDIS_IOBUF_LEN;
     /* If this is a multi bulk request, and we are processing a bulk reply
      * that is large enough, try to maximize the probabilty that the query
@@ -908,16 +946,21 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
         sdsIncrLen(c->querybuf,nread);
         c->lastinteraction = time(NULL);
     } else {
         sdsIncrLen(c->querybuf,nread);
         c->lastinteraction = time(NULL);
     } else {
+        server.current_client = NULL;
         return;
     }
     if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
         return;
     }
     if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
-        sds ci = getClientInfoString(c);
-        redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s", ci);
+        sds ci = getClientInfoString(c), bytes = sdsempty();
+
+        bytes = sdscatrepr(bytes,c->querybuf,64);
+        redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
         sdsfree(ci);
         sdsfree(ci);
+        sdsfree(bytes);
         freeClient(c);
         return;
     }
     processInputBuffer(c);
         freeClient(c);
         return;
     }
     processInputBuffer(c);
+    server.current_client = NULL;
 }
 
 void getClientsMaxBuffers(unsigned long *longest_output_list,
 }
 
 void getClientsMaxBuffers(unsigned long *longest_output_list,
@@ -972,7 +1015,7 @@ sds getClientInfoString(redisClient *client) {
     if (emask & AE_WRITABLE) *p++ = 'w';
     *p = '\0';
     return sdscatprintf(sdsempty(),
     if (emask & AE_WRITABLE) *p++ = 'w';
     *p = '\0';
     return sdscatprintf(sdsempty(),
-        "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu obl=%lu oll=%lu events=%s cmd=%s",
+        "addr=%s:%d fd=%d idle=%ld flags=%s db=%d sub=%d psub=%d qbuf=%lu obl=%lu oll=%lu omem=%lu events=%s cmd=%s",
         ip,port,client->fd,
         (long)(now - client->lastinteraction),
         flags,
         ip,port,client->fd,
         (long)(now - client->lastinteraction),
         flags,
@@ -982,6 +1025,7 @@ sds getClientInfoString(redisClient *client) {
         (unsigned long) sdslen(client->querybuf),
         (unsigned long) client->bufpos,
         (unsigned long) listLength(client->reply),
         (unsigned long) sdslen(client->querybuf),
         (unsigned long) client->bufpos,
         (unsigned long) listLength(client->reply),
+        getClientOutputBufferMemoryUsage(client),
         events,
         client->lastcmd ? client->lastcmd->name : "NULL");
 }
         events,
         client->lastcmd ? client->lastcmd->name : "NULL");
 }
@@ -994,8 +1038,12 @@ sds getAllClientsInfoString(void) {
 
     listRewind(server.clients,&li);
     while ((ln = listNext(&li)) != NULL) {
 
     listRewind(server.clients,&li);
     while ((ln = listNext(&li)) != NULL) {
+        sds cs;
+
         client = listNodeValue(ln);
         client = listNodeValue(ln);
-        o = sdscatsds(o,getClientInfoString(client));
+        cs = getClientInfoString(client);
+        o = sdscatsds(o,cs);
+        sdsfree(cs);
         o = sdscatlen(o,"\n",1);
     }
     return o;
         o = sdscatlen(o,"\n",1);
     }
     return o;
@@ -1082,3 +1130,22 @@ void rewriteClientCommandArgument(redisClient *c, int i, robj *newval) {
         redisAssertWithInfo(c,NULL,c->cmd != NULL);
     }
 }
         redisAssertWithInfo(c,NULL,c->cmd != NULL);
     }
 }
+
+/* This function returns the number of bytes that Redis is virtually
+ * using to store the reply still not read by the client.
+ * It is "virtual" since the reply output list may contain objects that
+ * are shared and are not really using additional memory.
+ *
+ * The function returns the total sum of the length of all the objects
+ * stored in the output list, plus the memory used to allocate every
+ * list node. The static reply buffer is not taken into account since it
+ * is allocated anyway.
+ *
+ * Note: this function is very fast so can be called as many time as
+ * the caller wishes. The main usage of this function currently is
+ * enforcing the client output lenght limits. */
+unsigned long getClientOutputBufferMemoryUsage(redisClient *c) {
+    unsigned long list_item_size = sizeof(listNode);
+
+    return c->reply_bytes + (list_item_size*listLength(c->reply));
+}