From: antirez <antirez@gmail.com>
Date: Tue, 13 Mar 2012 17:05:11 +0000 (+0100)
Subject: Process async client checks like client timeouts and BLPOP timeouts incrementally... 
X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/d19015be12c98f329cdaab039b843c3bf8931916

Process async client checks like client timeouts and BLPOP timeouts incrementally using a circular list.
---

diff --git a/src/adlist.c b/src/adlist.c
index 51ba03bd..e48957e3 100644
--- a/src/adlist.c
+++ b/src/adlist.c
@@ -323,3 +323,19 @@ listNode *listIndex(list *list, long index) {
     }
     return n;
 }
+
+/* Rotate the list removing the tail node and inserting it to the head. */
+void listRotate(list *list) {
+    listNode *tail = list->tail;
+
+    if (listLength(list) <= 1) return;
+
+    /* Detatch current tail */
+    list->tail = tail->prev;
+    list->tail->next = NULL;
+    /* Move it as head */
+    list->head->prev = tail;
+    tail->prev = NULL;
+    tail->next = list->head;
+    list->head = tail;
+}
diff --git a/src/adlist.h b/src/adlist.h
index 36dba1ff..259bd0f8 100644
--- a/src/adlist.h
+++ b/src/adlist.h
@@ -84,6 +84,7 @@ listNode *listSearchKey(list *list, void *key);
 listNode *listIndex(list *list, long index);
 void listRewind(list *list, listIter *li);
 void listRewindTail(list *list, listIter *li);
+void listRotate(list *list);
 
 /* Directions for iterators */
 #define AL_START_HEAD 0
diff --git a/src/networking.c b/src/networking.c
index 06097b58..ae77d11b 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -751,34 +751,6 @@ void resetClient(redisClient *c) {
     if (!(c->flags & REDIS_MULTI)) c->flags &= (~REDIS_ASKING);
 }
 
-void closeTimedoutClients(void) {
-    redisClient *c;
-    listNode *ln;
-    time_t now = time(NULL);
-    listIter li;
-
-    listRewind(server.clients,&li);
-    while ((ln = listNext(&li)) != NULL) {
-        c = listNodeValue(ln);
-        if (server.maxidletime &&
-            !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
-            !(c->flags & REDIS_MASTER) &&   /* no timeout for masters */
-            !(c->flags & REDIS_BLOCKED) &&  /* no timeout for BLPOP */
-            dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
-            listLength(c->pubsub_patterns) == 0 &&
-            (now - c->lastinteraction > server.maxidletime))
-        {
-            redisLog(REDIS_VERBOSE,"Closing idle client");
-            freeClient(c);
-        } else if (c->flags & REDIS_BLOCKED) {
-            if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
-                addReply(c,shared.nullmultibulk);
-                unblockClientWaitingData(c);
-            }
-        }
-    }
-}
-
 int processInlineBuffer(redisClient *c) {
     char *newline = strstr(c->querybuf,"\r\n");
     int argc, j;
diff --git a/src/redis.c b/src/redis.c
index 3294eea4..5741bc90 100644
--- a/src/redis.c
+++ b/src/redis.c
@@ -641,6 +641,50 @@ long long getOperationsPerSecond(void) {
     return sum / REDIS_OPS_SEC_SAMPLES;
 }
 
+void closeTimedoutClient(redisClient *c) {
+    time_t now = time(NULL);
+
+    if (server.maxidletime &&
+        !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
+        !(c->flags & REDIS_MASTER) &&   /* no timeout for masters */
+        !(c->flags & REDIS_BLOCKED) &&  /* no timeout for BLPOP */
+        dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
+        listLength(c->pubsub_patterns) == 0 &&
+        (now - c->lastinteraction > server.maxidletime))
+    {
+        redisLog(REDIS_VERBOSE,"Closing idle client");
+        freeClient(c);
+    } else if (c->flags & REDIS_BLOCKED) {
+        if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
+            addReply(c,shared.nullmultibulk);
+            unblockClientWaitingData(c);
+        }
+    }
+}
+
+void clientsCron(void) {
+    /* Make sure to process at least 1/100 of clients per call.
+     * Since this function is called 10 times per second we are sure that
+     * in the worst case we process all the clients in 10 seconds.
+     * In normal conditions (a reasonable number of clients) we process
+     * all the clients in a shorter time. */
+    int iterations = listLength(server.clients)/100;
+    if (iterations < 50) iterations = 50;
+
+    while(listLength(server.clients) && iterations--) {
+        redisClient *c;
+        listNode *head;
+
+        /* Rotate the list, take the current head, process.
+         * This way if the client must be removed from the list it's the
+         * first element and we don't incur into O(N) computation. */
+        listRotate(server.clients);
+        head = listFirst(server.clients);
+        c = listNodeValue(head);
+        closeTimedoutClient(c);
+    }
+}
+
 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     int j, loops = server.cronloops;
     REDIS_NOTUSED(eventLoop);
@@ -712,9 +756,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
             zmalloc_used_memory());
     }
 
-    /* Close connections of timedout clients */
-    if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
-        closeTimedoutClients();
+    /* We need to do a few operations on clients asynchronously. */
+    clientsCron();
 
     /* Start a scheduled AOF rewrite if this was requested by the user while
      * a BGSAVE was in progress. */