]> git.saurik.com Git - redis.git/blobdiff - src/aof.c
Merge remote-tracking branch 'origin/unstable' into unstable
[redis.git] / src / aof.c
index 51054b296c2359701e33c0610bd9a715346be91b..b43f99f5e8e5f47cd7ec0e9454c8233606ff6a60 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
@@ -3,6 +3,12 @@
 #include <signal.h>
 #include <fcntl.h>
 #include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/resource.h>
+#include <sys/wait.h>
+
+void aofUpdateCurrentSize(void);
 
 /* Called when the user switches from "appendonly yes" to "appendonly no"
  * at runtime using the CONFIG command. */
@@ -15,15 +21,15 @@ void stopAppendOnly(void) {
     server.appendseldb = -1;
     server.appendonly = 0;
     /* rewrite operation in progress? kill it, wait child exit */
-    if (server.bgsavechildpid != -1) {
+    if (server.bgrewritechildpid != -1) {
         int statloc;
 
-        if (kill(server.bgsavechildpid,SIGKILL) != -1)
+        if (kill(server.bgrewritechildpid,SIGKILL) != -1)
             wait3(&statloc,0,NULL);
         /* reset the buffer accumulating changes while the child saves */
         sdsfree(server.bgrewritebuf);
         server.bgrewritebuf = sdsempty();
-        server.bgsavechildpid = -1;
+        server.bgrewritechildpid = -1;
     }
 }
 
@@ -78,6 +84,7 @@ void flushAppendOnlyFile(void) {
     }
     sdsfree(server.aofbuf);
     server.aofbuf = sdsempty();
+    server.appendonly_current_size += nwritten;
 
     /* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have
      * childs performing heavy I/O on disk. */
@@ -185,11 +192,13 @@ struct redisClient *createFakeClient(void) {
     c->querybuf = sdsempty();
     c->argc = 0;
     c->argv = NULL;
+    c->bufpos = 0;
     c->flags = 0;
     /* We set the fake client as a slave waiting for the synchronization
      * so that Redis will not try to send replies to this client. */
     c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
     c->reply = listCreate();
+    c->watched_keys = listCreate();
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
     initClientMultiState(c);
@@ -199,6 +208,7 @@ struct redisClient *createFakeClient(void) {
 void freeFakeClient(struct redisClient *c) {
     sdsfree(c->querybuf);
     listRelease(c->reply);
+    listRelease(c->watched_keys);
     freeClientMultiState(c);
     zfree(c);
 }
@@ -211,9 +221,13 @@ int loadAppendOnlyFile(char *filename) {
     FILE *fp = fopen(filename,"r");
     struct redis_stat sb;
     int appendonly = server.appendonly;
+    long loops = 0;
 
-    if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
+    if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
+        server.appendonly_current_size = 0;
+        fclose(fp);
         return REDIS_ERR;
+    }
 
     if (fp == NULL) {
         redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
@@ -225,6 +239,8 @@ int loadAppendOnlyFile(char *filename) {
     server.appendonly = 0;
 
     fakeClient = createFakeClient();
+    startLoading(fp);
+
     while(1) {
         int argc, j;
         unsigned long len;
@@ -232,7 +248,12 @@ int loadAppendOnlyFile(char *filename) {
         char buf[128];
         sds argsds;
         struct redisCommand *cmd;
-        int force_swapout;
+
+        /* Serve the clients from time to time */
+        if (!(loops++ % 1000)) {
+            loadingProgress(ftello(fp));
+            aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
+        }
 
         if (fgets(buf,sizeof(buf),fp) == NULL) {
             if (feof(fp))
@@ -259,29 +280,21 @@ int loadAppendOnlyFile(char *filename) {
             redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
             exit(1);
         }
-        /* Try object encoding */
-        if (cmd->flags & REDIS_CMD_BULK)
-            argv[argc-1] = tryObjectEncoding(argv[argc-1]);
         /* Run the command in the context of a fake client */
         fakeClient->argc = argc;
         fakeClient->argv = argv;
         cmd->proc(fakeClient);
-        /* Discard the reply objects list from the fake client */
-        while(listLength(fakeClient->reply))
-            listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
-        /* Clean up, ready for the next command */
-        for (j = 0; j < argc; j++) decrRefCount(argv[j]);
-        zfree(argv);
-        /* Handle swapping while loading big datasets when VM is on */
-        force_swapout = 0;
-        if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
-            force_swapout = 1;
-
-        if (server.vm_enabled && force_swapout) {
-            while (zmalloc_used_memory() > server.vm_max_memory) {
-                if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
-            }
-        }
+
+        /* The fake client should not have a reply */
+        redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
+        /* The fake client should never get blocked */
+        redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
+
+        /* Clean up. Command code may have changed argv/argc so we use the
+         * argv/argc of the client instead of the local variables. */
+        for (j = 0; j < fakeClient->argc; j++)
+            decrRefCount(fakeClient->argv[j]);
+        zfree(fakeClient->argv);
     }
 
     /* This point can only be reached when EOF is reached without errors.
@@ -291,6 +304,9 @@ int loadAppendOnlyFile(char *filename) {
     fclose(fp);
     freeFakeClient(fakeClient);
     server.appendonly = appendonly;
+    stopLoading();
+    aofUpdateCurrentSize();
+    server.auto_aofrewrite_base_size = server.appendonly_current_size;
     return REDIS_OK;
 
 readerr:
@@ -301,59 +317,10 @@ readerr:
     }
     exit(1);
 fmterr:
-    redisLog(REDIS_WARNING,"Bad file format reading the append only file");
+    redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
     exit(1);
 }
 
-/* Write binary-safe string into a file in the bulkformat
- * $<count>\r\n<payload>\r\n */
-int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
-    char cbuf[128];
-    int clen;
-    cbuf[0] = '$';
-    clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len);
-    cbuf[clen++] = '\r';
-    cbuf[clen++] = '\n';
-    if (fwrite(cbuf,clen,1,fp) == 0) return 0;
-    if (len > 0 && fwrite(s,len,1,fp) == 0) return 0;
-    if (fwrite("\r\n",2,1,fp) == 0) return 0;
-    return 1;
-}
-
-/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
-int fwriteBulkDouble(FILE *fp, double d) {
-    char buf[128], dbuf[128];
-
-    snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
-    snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
-    if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
-    if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
-    return 1;
-}
-
-/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
-int fwriteBulkLongLong(FILE *fp, long long l) {
-    char bbuf[128], lbuf[128];
-    unsigned int blen, llen;
-    llen = ll2string(lbuf,32,l);
-    blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf);
-    if (fwrite(bbuf,blen,1,fp) == 0) return 0;
-    return 1;
-}
-
-/* Delegate writing an object to writing a bulk string or bulk long long. */
-int fwriteBulkObject(FILE *fp, robj *obj) {
-    /* Avoid using getDecodedObject to help copy-on-write (we are often
-     * in a child process when this function is called). */
-    if (obj->encoding == REDIS_ENCODING_INT) {
-        return fwriteBulkLongLong(fp,(long)obj->ptr);
-    } else if (obj->encoding == REDIS_ENCODING_RAW) {
-        return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr));
-    } else {
-        redisPanic("Unknown string encoding");
-    }
-}
-
 /* Write a sequence of commands able to fully rebuild the dataset into
  * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
 int rewriteAppendOnlyFile(char *filename) {
@@ -377,7 +344,7 @@ int rewriteAppendOnlyFile(char *filename) {
         redisDb *db = server.db+j;
         dict *d = db->dict;
         if (dictSize(d) == 0) continue;
-        di = dictGetIterator(d);
+        di = dictGetSafeIterator(d);
         if (!di) {
             fclose(fp);
             return REDIS_ERR;
@@ -389,25 +356,14 @@ int rewriteAppendOnlyFile(char *filename) {
 
         /* Iterate this DB writing every entry */
         while((de = dictNext(di)) != NULL) {
-            sds keystr = dictGetEntryKey(de);
+            sds keystr;
             robj key, *o;
             time_t expiretime;
-            int swapped;
 
             keystr = dictGetEntryKey(de);
             o = dictGetEntryVal(de);
             initStaticStringObject(key,keystr);
-            /* If the value for this key is swapped, load a preview in memory.
-             * We use a "swapped" flag to remember if we need to free the
-             * value object instead to just increment the ref count anyway
-             * in order to avoid copy-on-write of pages if we are forked() */
-            if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
-                o->storage == REDIS_VM_SWAPPING) {
-                swapped = 0;
-            } else {
-                o = vmPreviewObject(o);
-                swapped = 1;
-            }
+
             expiretime = getExpire(db,&key);
 
             /* Save the key and associated value */
@@ -457,37 +413,81 @@ int rewriteAppendOnlyFile(char *filename) {
                     redisPanic("Unknown list encoding");
                 }
             } else if (o->type == REDIS_SET) {
-                /* Emit the SADDs needed to rebuild the set */
-                dict *set = o->ptr;
-                dictIterator *di = dictGetIterator(set);
-                dictEntry *de;
-
-                while((de = dictNext(di)) != NULL) {
-                    char cmd[]="*3\r\n$4\r\nSADD\r\n";
-                    robj *eleobj = dictGetEntryKey(de);
+                char cmd[]="*3\r\n$4\r\nSADD\r\n";
 
-                    if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                    if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                    if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                /* Emit the SADDs needed to rebuild the set */
+                if (o->encoding == REDIS_ENCODING_INTSET) {
+                    int ii = 0;
+                    int64_t llval;
+                    while(intsetGet(o->ptr,ii++,&llval)) {
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                        if (fwriteBulkLongLong(fp,llval) == 0) goto werr;
+                    }
+                } else if (o->encoding == REDIS_ENCODING_HT) {
+                    dictIterator *di = dictGetIterator(o->ptr);
+                    dictEntry *de;
+                    while((de = dictNext(di)) != NULL) {
+                        robj *eleobj = dictGetEntryKey(de);
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                        if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                    }
+                    dictReleaseIterator(di);
+                } else {
+                    redisPanic("Unknown set encoding");
                 }
-                dictReleaseIterator(di);
             } else if (o->type == REDIS_ZSET) {
                 /* Emit the ZADDs needed to rebuild the sorted set */
-                zset *zs = o->ptr;
-                dictIterator *di = dictGetIterator(zs->dict);
-                dictEntry *de;
-
-                while((de = dictNext(di)) != NULL) {
-                    char cmd[]="*4\r\n$4\r\nZADD\r\n";
-                    robj *eleobj = dictGetEntryKey(de);
-                    double *score = dictGetEntryVal(de);
-
-                    if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                    if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                    if (fwriteBulkDouble(fp,*score) == 0) goto werr;
-                    if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                char cmd[]="*4\r\n$4\r\nZADD\r\n";
+
+                if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+                    unsigned char *zl = o->ptr;
+                    unsigned char *eptr, *sptr;
+                    unsigned char *vstr;
+                    unsigned int vlen;
+                    long long vll;
+                    double score;
+
+                    eptr = ziplistIndex(zl,0);
+                    redisAssert(eptr != NULL);
+                    sptr = ziplistNext(zl,eptr);
+                    redisAssert(sptr != NULL);
+
+                    while (eptr != NULL) {
+                        redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
+                        score = zzlGetScore(sptr);
+
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                        if (fwriteBulkDouble(fp,score) == 0) goto werr;
+                        if (vstr != NULL) {
+                            if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
+                                goto werr;
+                        } else {
+                            if (fwriteBulkLongLong(fp,vll) == 0)
+                                goto werr;
+                        }
+                        zzlNext(zl,&eptr,&sptr);
+                    }
+                } else if (o->encoding == REDIS_ENCODING_SKIPLIST) {
+                    zset *zs = o->ptr;
+                    dictIterator *di = dictGetIterator(zs->dict);
+                    dictEntry *de;
+
+                    while((de = dictNext(di)) != NULL) {
+                        robj *eleobj = dictGetEntryKey(de);
+                        double *score = dictGetEntryVal(de);
+
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                        if (fwriteBulkDouble(fp,*score) == 0) goto werr;
+                        if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                    }
+                    dictReleaseIterator(di);
+                } else {
+                    redisPanic("Unknown sorted set encoding");
                 }
-                dictReleaseIterator(di);
             } else if (o->type == REDIS_HASH) {
                 char cmd[]="*4\r\n$4\r\nHSET\r\n";
 
@@ -500,10 +500,10 @@ int rewriteAppendOnlyFile(char *filename) {
                     while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
                         if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                        if (fwriteBulkString(fp,(char*)field,flen) == -1)
-                            return -1;
-                        if (fwriteBulkString(fp,(char*)val,vlen) == -1)
-                            return -1;
+                        if (fwriteBulkString(fp,(char*)field,flen) == 0)
+                            goto werr;
+                        if (fwriteBulkString(fp,(char*)val,vlen) == 0)
+                            goto werr;
                     }
                 } else {
                     dictIterator *di = dictGetIterator(o->ptr);
@@ -515,8 +515,8 @@ int rewriteAppendOnlyFile(char *filename) {
 
                         if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                        if (fwriteBulkObject(fp,field) == -1) return -1;
-                        if (fwriteBulkObject(fp,val) == -1) return -1;
+                        if (fwriteBulkObject(fp,field) == 0) goto werr;
+                        if (fwriteBulkObject(fp,val) == 0) goto werr;
                     }
                     dictReleaseIterator(di);
                 }
@@ -532,7 +532,6 @@ int rewriteAppendOnlyFile(char *filename) {
                 if (fwriteBulkObject(fp,&key) == 0) goto werr;
                 if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
             }
-            if (swapped) decrRefCount(o);
         }
         dictReleaseIterator(di);
     }
@@ -574,15 +573,16 @@ werr:
  */
 int rewriteAppendOnlyFileBackground(void) {
     pid_t childpid;
+    long long start;
 
     if (server.bgrewritechildpid != -1) return REDIS_ERR;
-    if (server.vm_enabled) waitEmptyIOJobsQueue();
+    start = ustime();
     if ((childpid = fork()) == 0) {
-        /* Child */
         char tmpfile[256];
 
-        if (server.vm_enabled) vmReopenSwapFile();
-        close(server.fd);
+        /* Child */
+        if (server.ipfd > 0) close(server.ipfd);
+        if (server.sofd > 0) close(server.sofd);
         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
         if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
             _exit(0);
@@ -591,6 +591,7 @@ int rewriteAppendOnlyFileBackground(void) {
         }
     } else {
         /* Parent */
+        server.stat_fork_time = ustime()-start;
         if (childpid == -1) {
             redisLog(REDIS_WARNING,
                 "Can't rewrite append only file in background: fork: %s",
@@ -613,12 +614,12 @@ int rewriteAppendOnlyFileBackground(void) {
 
 void bgrewriteaofCommand(redisClient *c) {
     if (server.bgrewritechildpid != -1) {
-        addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
-        return;
-    }
-    if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
-        char *status = "+Background append only file rewriting started\r\n";
-        addReplySds(c,sdsnew(status));
+        addReplyError(c,"Background append only file rewriting already in progress");
+    } else if (server.bgsavechildpid != -1) {
+        server.aofrewrite_scheduled = 1;
+        addReplyStatus(c,"Background append only file rewriting scheduled");
+    } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
+        addReplyStatus(c,"Background append only file rewriting started");
     } else {
         addReply(c,shared.err);
     }
@@ -631,12 +632,24 @@ void aofRemoveTempFile(pid_t childpid) {
     unlink(tmpfile);
 }
 
+/* Update the server.appendonly_current_size filed explicitly using stat(2)
+ * to check the size of the file. This is useful after a rewrite or after
+ * a restart, normally the size is updated just adding the write length
+ * to the current lenght, that is much faster. */
+void aofUpdateCurrentSize(void) {
+    struct redis_stat sb;
+
+    if (redis_fstat(server.appendfd,&sb) == -1) {
+        redisLog(REDIS_WARNING,"Unable to check the AOF length: %s",
+            strerror(errno));
+    } else {
+        server.appendonly_current_size = sb.st_size;
+    }
+}
+
 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
  * Handle this. */
-void backgroundRewriteDoneHandler(int statloc) {
-    int exitcode = WEXITSTATUS(statloc);
-    int bysignal = WIFSIGNALED(statloc);
-
+void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
     if (!bysignal && exitcode == 0) {
         int fd;
         char tmpfile[256];
@@ -674,6 +687,8 @@ void backgroundRewriteDoneHandler(int statloc) {
             if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd);
             server.appendseldb = -1; /* Make sure it will issue SELECT */
             redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
+            aofUpdateCurrentSize();
+            server.auto_aofrewrite_base_size = server.appendonly_current_size;
         } else {
             /* If append only is disabled we just generate a dump in this
              * format. Why not? */
@@ -684,7 +699,7 @@ void backgroundRewriteDoneHandler(int statloc) {
     } else {
         redisLog(REDIS_WARNING,
             "Background append only file rewriting terminated by signal %d",
-            WTERMSIG(statloc));
+            bysignal);
     }
 cleanup:
     sdsfree(server.bgrewritebuf);