]> git.saurik.com Git - redis.git/commitdiff
More threaded I/O VM work + Redis init script
authorantirez <antirez@gmail.com>
Mon, 11 Jan 2010 10:15:54 +0000 (05:15 -0500)
committerantirez <antirez@gmail.com>
Mon, 11 Jan 2010 10:15:54 +0000 (05:15 -0500)
Makefile
redis.c
staticsymbols.h
utils/redis_init_script [new file with mode: 0755]
zmalloc.c

index 50b328874dd69024f006365bbbace244d4d2fe30..5caf21b8ec6394442b2546b9f2e9776e545babb8 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -5,12 +5,10 @@
 uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
 ifeq ($(uname_S),SunOS)
   CFLAGS?= -std=c99 -pedantic -O2 -Wall -W -D__EXTENSIONS__ -D_XPG6
-  CCLINK?= -ldl -lnsl -lsocket -lm
-  PTLINK?= -lpthread
+  CCLINK?= -ldl -lnsl -lsocket -lm -lpthread
 else
   CFLAGS?= -std=c99 -pedantic -O2 -Wall -W $(ARCH) $(PROF)
-  CCLINK?= -lm
-  PTLINK?= -lpthread
+  CCLINK?= -lm -pthread
 endif
 CCOPT= $(CFLAGS) $(CCLINK) $(ARCH) $(PROF)
 DEBUG?= -g -rdynamic -ggdb 
@@ -42,7 +40,7 @@ sds.o: sds.c sds.h zmalloc.h
 zmalloc.o: zmalloc.c config.h
 
 redis-server: $(OBJ)
-       $(CC) -o $(PRGNAME) $(CCOPT) $(PTLINK) $(DEBUG) $(OBJ)
+       $(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ)
        @echo ""
        @echo "Hint: To run the test-redis.tcl script is a good idea."
        @echo "Launch the redis server with ./redis-server, then in another"
diff --git a/redis.c b/redis.c
index 36bee82cf13435e86c357a78ff1764a54073f1ee..6d9e95d5705fea7e0f020e6002e43b1a8693ed4a 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -536,7 +536,8 @@ static void vmInit(void);
 static void vmMarkPagesFree(off_t page, off_t count);
 static robj *vmLoadObject(robj *key);
 static robj *vmPreviewObject(robj *key);
-static int vmSwapOneObject(void);
+static int vmSwapOneObjectBlocking(void);
+static int vmSwapOneObjectThreaded(void);
 static int vmCanSwapOut(void);
 static void freeOneObjectFromFreelist(void);
 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
@@ -1232,11 +1233,17 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
         {
             if (listLength(server.objfreelist)) {
                 freeOneObjectFromFreelist();
-            } else if (vmSwapOneObject() == REDIS_ERR) {
-                if ((loops % 30) == 0 && zmalloc_used_memory() >
-                    (server.vm_max_memory+server.vm_max_memory/10)) {
-                    redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
+            } else {
+                if (vmSwapOneObjectThreaded() == REDIS_ERR) {
+                    if ((loops % 30) == 0 && zmalloc_used_memory() >
+                        (server.vm_max_memory+server.vm_max_memory/10)) {
+                        redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
+                    }
                 }
+                /* Note that we freed just one object, because anyway when
+                 * the I/O thread in charge to swap this object out will
+                 * do its work, the handler of completed jobs will try to swap
+                 * more objects if we are out of memory. */
                 break;
             }
         }
@@ -1635,6 +1642,7 @@ static void freeClient(redisClient *c) {
     aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
     aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
     listRelease(c->reply);
+    listRelease(c->io_keys);
     freeClientArgv(c);
     close(c->fd);
     /* Remove from the list of clients */
@@ -3329,7 +3337,7 @@ static int rdbLoad(char *filename) {
         loadedkeys++;
         if (server.vm_enabled && (loadedkeys % 5000) == 0) {
             while (zmalloc_used_memory() > server.vm_max_memory) {
-                if (vmSwapOneObject() == REDIS_ERR) break;
+                if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
             }
         }
     }
@@ -6605,7 +6613,7 @@ int loadAppendOnlyFile(char *filename) {
         loadedkeys++;
         if (server.vm_enabled && (loadedkeys % 5000) == 0) {
             while (zmalloc_used_memory() > server.vm_max_memory) {
-                if (vmSwapOneObject() == REDIS_ERR) break;
+                if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
             }
         }
     }
@@ -7071,7 +7079,7 @@ static int vmFindContiguousPages(off_t *first, int n) {
  * needed to later retrieve the object into the key object.
  * If we can't find enough contiguous empty pages to swap the object on disk
  * REDIS_ERR is returned. */
-static int vmSwapObject(robj *key, robj *val) {
+static int vmSwapObjectBlocking(robj *key, robj *val) {
     off_t pages = rdbSavedObjectPages(val);
     off_t page;
 
@@ -7080,7 +7088,7 @@ static int vmSwapObject(robj *key, robj *val) {
     if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR;
     if (fseeko(server.vm_fp,page*server.vm_page_size,SEEK_SET) == -1) {
         redisLog(REDIS_WARNING,
-            "Critical VM problem in vmSwapObject(): can't seek: %s",
+            "Critical VM problem in vmSwapObjectBlocking(): can't seek: %s",
             strerror(errno));
         return REDIS_ERR;
     }
@@ -7100,6 +7108,13 @@ static int vmSwapObject(robj *key, robj *val) {
     return REDIS_OK;
 }
 
+static int vmSwapObjectThreaded(robj *key, robj *val) {
+
+    key = key;
+    val = val;
+    return REDIS_OK;
+}
+
 /* Load the value object relative to the 'key' object from swap to memory.
  * The newly allocated object is returned.
  *
@@ -7221,8 +7236,11 @@ static double computeObjectSwappability(robj *o) {
 
 /* Try to swap an object that's a good candidate for swapping.
  * Returns REDIS_OK if the object was swapped, REDIS_ERR if it's not possible
- * to swap any object at all. */
-static int vmSwapOneObject(void) {
+ * to swap any object at all.
+ *
+ * If 'usethreaded' is true, Redis will try to swap the object in background
+ * using I/O threads. */
+static int vmSwapOneObject(int usethreads) {
     int j, i;
     struct dictEntry *best = NULL;
     double best_swappability = 0;
@@ -7269,14 +7287,27 @@ static int vmSwapOneObject(void) {
         key = dictGetEntryKey(best) = newkey;
     }
     /* Swap it */
-    if (vmSwapObject(key,val) == REDIS_OK) {
-        dictGetEntryVal(best) = NULL;
+    if (usethreads) {
+        vmSwapObjectThreaded(key,val);
         return REDIS_OK;
     } else {
-        return REDIS_ERR;
+        if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
+            dictGetEntryVal(best) = NULL;
+            return REDIS_OK;
+        } else {
+            return REDIS_ERR;
+        }
     }
 }
 
+static int vmSwapOneObjectBlocking() {
+    return vmSwapOneObject(0);
+}
+
+static int vmSwapOneObjectThreaded() {
+    return vmSwapOneObject(1);
+}
+
 /* Return true if it's safe to swap out objects in a given moment.
  * Basically we don't want to swap objects out while there is a BGSAVE
  * or a BGAEOREWRITE running in backgroud. */
@@ -7358,6 +7389,7 @@ static void vmCancelThreadedIOJob(robj *o) {
                     if (job->type == REDIS_IOJOB_SWAP)
                         decrRefCount(job->val);
                     listDelNode(lists[i],ln);
+                    zfree(job);
                     break;
                 case 1: /* io_processing */
                 case 2: /* io_processed */
@@ -7448,7 +7480,7 @@ static void debugCommand(redisClient *c) {
         /* Swap it */
         if (key->storage != REDIS_VM_MEMORY) {
             addReplySds(c,sdsnew("-ERR This key is not in memory\r\n"));
-        } else if (vmSwapObject(key,val) == REDIS_OK) {
+        } else if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
             dictGetEntryVal(de) = NULL;
             addReply(c,shared.ok);
         } else {
index c8ad4a9927f1f709ed96f8e197e3379fa20fc2c9..a7f94694889a08d458c1a7905c1221f4115627f9 100644 (file)
@@ -14,6 +14,7 @@ static struct redisFunctionSym symsTable[] = {
 {"blockingPopGenericCommand",(unsigned long)blockingPopGenericCommand},
 {"blpopCommand",(unsigned long)blpopCommand},
 {"brpopCommand",(unsigned long)brpopCommand},
+{"bytesToHuman",(unsigned long)bytesToHuman},
 {"call",(unsigned long)call},
 {"closeTimedoutClients",(unsigned long)closeTimedoutClients},
 {"compareStringObjects",(unsigned long)compareStringObjects},
@@ -33,6 +34,7 @@ static struct redisFunctionSym symsTable[] = {
 {"decrRefCount",(unsigned long)decrRefCount},
 {"decrbyCommand",(unsigned long)decrbyCommand},
 {"delCommand",(unsigned long)delCommand},
+{"deleteIfSwapped",(unsigned long)deleteIfSwapped},
 {"deleteIfVolatile",(unsigned long)deleteIfVolatile},
 {"deleteKey",(unsigned long)deleteKey},
 {"dictEncObjKeyCompare",(unsigned long)dictEncObjKeyCompare},
@@ -60,6 +62,7 @@ static struct redisFunctionSym symsTable[] = {
 {"freeHashObject",(unsigned long)freeHashObject},
 {"freeListObject",(unsigned long)freeListObject},
 {"freeMemoryIfNeeded",(unsigned long)freeMemoryIfNeeded},
+{"freeOneObjectFromFreelist",(unsigned long)freeOneObjectFromFreelist},
 {"freeSetObject",(unsigned long)freeSetObject},
 {"freeStringObject",(unsigned long)freeStringObject},
 {"freeZsetObject",(unsigned long)freeZsetObject},
@@ -90,6 +93,7 @@ static struct redisFunctionSym symsTable[] = {
 {"lindexCommand",(unsigned long)lindexCommand},
 {"llenCommand",(unsigned long)llenCommand},
 {"loadServerConfig",(unsigned long)loadServerConfig},
+{"lockThreadedIO",(unsigned long)lockThreadedIO},
 {"lookupKey",(unsigned long)lookupKey},
 {"lookupKeyByPattern",(unsigned long)lookupKeyByPattern},
 {"lookupKeyRead",(unsigned long)lookupKeyRead},
@@ -198,17 +202,26 @@ static struct redisFunctionSym symsTable[] = {
 {"ttlCommand",(unsigned long)ttlCommand},
 {"typeCommand",(unsigned long)typeCommand},
 {"unblockClient",(unsigned long)unblockClient},
+{"unlockThreadedIO",(unsigned long)unlockThreadedIO},
 {"updateSlavesWaitingBgsave",(unsigned long)updateSlavesWaitingBgsave},
+{"vmCanSwapOut",(unsigned long)vmCanSwapOut},
+{"vmCancelThreadedIOJob",(unsigned long)vmCancelThreadedIOJob},
 {"vmFindContiguousPages",(unsigned long)vmFindContiguousPages},
 {"vmFreePage",(unsigned long)vmFreePage},
+{"vmGenericLoadObject",(unsigned long)vmGenericLoadObject},
 {"vmInit",(unsigned long)vmInit},
 {"vmLoadObject",(unsigned long)vmLoadObject},
 {"vmMarkPageFree",(unsigned long)vmMarkPageFree},
 {"vmMarkPageUsed",(unsigned long)vmMarkPageUsed},
 {"vmMarkPagesFree",(unsigned long)vmMarkPagesFree},
 {"vmMarkPagesUsed",(unsigned long)vmMarkPagesUsed},
-{"vmSwapObject",(unsigned long)vmSwapObject},
+{"vmPreviewObject",(unsigned long)vmPreviewObject},
+{"vmSwapObjectBlocking",(unsigned long)vmSwapObjectBlocking},
+{"vmSwapObjectThreaded",(unsigned long)vmSwapObjectThreaded},
 {"vmSwapOneObject",(unsigned long)vmSwapOneObject},
+{"vmSwapOneObjectBlocking",(unsigned long)vmSwapOneObjectBlocking},
+{"vmSwapOneObjectThreaded",(unsigned long)vmSwapOneObjectThreaded},
+{"vmThreadedIOCompletedJob",(unsigned long)vmThreadedIOCompletedJob},
 {"yesnotoi",(unsigned long)yesnotoi},
 {"zaddCommand",(unsigned long)zaddCommand},
 {"zaddGenericCommand",(unsigned long)zaddGenericCommand},
diff --git a/utils/redis_init_script b/utils/redis_init_script
new file mode 100755 (executable)
index 0000000..35b906f
--- /dev/null
@@ -0,0 +1,36 @@
+#!/bin/sh
+
+REDISPORT=6379
+EXEC=/usr/local/bin/redis-server
+
+PIDFILE=/var/run/redis_${REDISPORT}.pid
+CONF="/etc/redis/${REDISPORT}.conf"
+
+case "$1" in
+    start)
+        if [ -f $PIDFILE ]
+        then
+                echo -n "$PIDFILE exists, process is already running or crashed\n"
+        else
+                echo -n "Starting Redis server...\n"
+                $EXEC $CONF
+        fi
+        ;;
+    stop)
+        if [ ! -f $PIDFILE ]
+        then
+                echo -n "$PIDFILE does not exist, process is not running\n"
+        else
+                echo -n "Stopping ...\n"
+                echo -n "Sending SHUTDOWN\r\n" | nc localhost $REDISPORT &
+                PID=$(cat $PIDFILE)
+                while [ -x /proc/${PIDFILE} ]
+                do
+                    echo "Waiting for Redis to shutdown ..."
+                    sleep 1
+                done
+                rm $PIDFILE
+                echo "Redis stopped"
+        fi
+        ;;
+esac
index eb06da3b8925ab52e1c1d981c8ab2106e695433c..64263756ea2ccb2a5391e028a449fffae4e66773 100644 (file)
--- a/zmalloc.c
+++ b/zmalloc.c
@@ -31,6 +31,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <pthread.h>
 #include "config.h"
 
 #if defined(__sun)
@@ -40,6 +41,7 @@
 #endif
 
 static size_t used_memory = 0;
+pthread_mutex_t used_memory_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 static void zmalloc_oom(size_t size) {
     fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n",