]> git.saurik.com Git - redis.git/blobdiff - src/aof.c
added new RDB codes for ziplist encoded lists and intset encodeed sets
[redis.git] / src / aof.c
index f8b92d2d3301ebee556c02612c0123461b195728..56392f2aac7d0757458556c2c58ced2f316a4f3e 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
@@ -189,6 +189,7 @@ struct redisClient *createFakeClient(void) {
     c->querybuf = sdsempty();
     c->argc = 0;
     c->argv = NULL;
+    c->bufpos = 0;
     c->flags = 0;
     /* We set the fake client as a slave waiting for the synchronization
      * so that Redis will not try to send replies to this client. */
@@ -217,6 +218,7 @@ int loadAppendOnlyFile(char *filename) {
     FILE *fp = fopen(filename,"r");
     struct redis_stat sb;
     int appendonly = server.appendonly;
+    long loops = 0;
 
     if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
         return REDIS_ERR;
@@ -231,6 +233,8 @@ int loadAppendOnlyFile(char *filename) {
     server.appendonly = 0;
 
     fakeClient = createFakeClient();
+    startLoading(fp);
+
     while(1) {
         int argc, j;
         unsigned long len;
@@ -238,7 +242,12 @@ int loadAppendOnlyFile(char *filename) {
         char buf[128];
         sds argsds;
         struct redisCommand *cmd;
-        int force_swapout;
+
+        /* Serve the clients from time to time */
+        if (!(loops++ % 1000)) {
+            loadingProgress(ftello(fp));
+            aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
+        }
 
         if (fgets(buf,sizeof(buf),fp) == NULL) {
             if (feof(fp))
@@ -265,29 +274,17 @@ int loadAppendOnlyFile(char *filename) {
             redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
             exit(1);
         }
-        /* Try object encoding */
-        if (cmd->flags & REDIS_CMD_BULK)
-            argv[argc-1] = tryObjectEncoding(argv[argc-1]);
         /* Run the command in the context of a fake client */
         fakeClient->argc = argc;
         fakeClient->argv = argv;
         cmd->proc(fakeClient);
-        /* Discard the reply objects list from the fake client */
-        while(listLength(fakeClient->reply))
-            listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
+
+        /* The fake client should not have a reply */
+        redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
+
         /* Clean up, ready for the next command */
         for (j = 0; j < argc; j++) decrRefCount(argv[j]);
         zfree(argv);
-        /* Handle swapping while loading big datasets when VM is on */
-        force_swapout = 0;
-        if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
-            force_swapout = 1;
-
-        if (server.vm_enabled && force_swapout) {
-            while (zmalloc_used_memory() > server.vm_max_memory) {
-                if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
-            }
-        }
     }
 
     /* This point can only be reached when EOF is reached without errors.
@@ -297,6 +294,7 @@ int loadAppendOnlyFile(char *filename) {
     fclose(fp);
     freeFakeClient(fakeClient);
     server.appendonly = appendonly;
+    stopLoading();
     return REDIS_OK;
 
 readerr:
@@ -307,59 +305,10 @@ readerr:
     }
     exit(1);
 fmterr:
-    redisLog(REDIS_WARNING,"Bad file format reading the append only file");
+    redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
     exit(1);
 }
 
-/* Write binary-safe string into a file in the bulkformat
- * $<count>\r\n<payload>\r\n */
-int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
-    char cbuf[128];
-    int clen;
-    cbuf[0] = '$';
-    clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len);
-    cbuf[clen++] = '\r';
-    cbuf[clen++] = '\n';
-    if (fwrite(cbuf,clen,1,fp) == 0) return 0;
-    if (len > 0 && fwrite(s,len,1,fp) == 0) return 0;
-    if (fwrite("\r\n",2,1,fp) == 0) return 0;
-    return 1;
-}
-
-/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
-int fwriteBulkDouble(FILE *fp, double d) {
-    char buf[128], dbuf[128];
-
-    snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
-    snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
-    if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
-    if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
-    return 1;
-}
-
-/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
-int fwriteBulkLongLong(FILE *fp, long long l) {
-    char bbuf[128], lbuf[128];
-    unsigned int blen, llen;
-    llen = ll2string(lbuf,32,l);
-    blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf);
-    if (fwrite(bbuf,blen,1,fp) == 0) return 0;
-    return 1;
-}
-
-/* Delegate writing an object to writing a bulk string or bulk long long. */
-int fwriteBulkObject(FILE *fp, robj *obj) {
-    /* Avoid using getDecodedObject to help copy-on-write (we are often
-     * in a child process when this function is called). */
-    if (obj->encoding == REDIS_ENCODING_INT) {
-        return fwriteBulkLongLong(fp,(long)obj->ptr);
-    } else if (obj->encoding == REDIS_ENCODING_RAW) {
-        return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr));
-    } else {
-        redisPanic("Unknown string encoding");
-    }
-}
-
 /* Write a sequence of commands able to fully rebuild the dataset into
  * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
 int rewriteAppendOnlyFile(char *filename) {
@@ -398,22 +347,11 @@ int rewriteAppendOnlyFile(char *filename) {
             sds keystr = dictGetEntryKey(de);
             robj key, *o;
             time_t expiretime;
-            int swapped;
 
             keystr = dictGetEntryKey(de);
             o = dictGetEntryVal(de);
             initStaticStringObject(key,keystr);
-            /* If the value for this key is swapped, load a preview in memory.
-             * We use a "swapped" flag to remember if we need to free the
-             * value object instead to just increment the ref count anyway
-             * in order to avoid copy-on-write of pages if we are forked() */
-            if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
-                o->storage == REDIS_VM_SWAPPING) {
-                swapped = 0;
-            } else {
-                o = vmPreviewObject(o);
-                swapped = 1;
-            }
+
             expiretime = getExpire(db,&key);
 
             /* Save the key and associated value */
@@ -463,20 +401,30 @@ int rewriteAppendOnlyFile(char *filename) {
                     redisPanic("Unknown list encoding");
                 }
             } else if (o->type == REDIS_SET) {
-                /* Emit the SADDs needed to rebuild the set */
-                dict *set = o->ptr;
-                dictIterator *di = dictGetIterator(set);
-                dictEntry *de;
+                char cmd[]="*3\r\n$4\r\nSADD\r\n";
 
-                while((de = dictNext(di)) != NULL) {
-                    char cmd[]="*3\r\n$4\r\nSADD\r\n";
-                    robj *eleobj = dictGetEntryKey(de);
-
-                    if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                    if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                    if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                /* Emit the SADDs needed to rebuild the set */
+                if (o->encoding == REDIS_ENCODING_INTSET) {
+                    int ii = 0;
+                    int64_t llval;
+                    while(intsetGet(o->ptr,ii++,&llval)) {
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                        if (fwriteBulkLongLong(fp,llval) == 0) goto werr;
+                    }
+                } else if (o->encoding == REDIS_ENCODING_HT) {
+                    dictIterator *di = dictGetIterator(o->ptr);
+                    dictEntry *de;
+                    while((de = dictNext(di)) != NULL) {
+                        robj *eleobj = dictGetEntryKey(de);
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                        if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                    }
+                    dictReleaseIterator(di);
+                } else {
+                    redisPanic("Unknown set encoding");
                 }
-                dictReleaseIterator(di);
             } else if (o->type == REDIS_ZSET) {
                 /* Emit the ZADDs needed to rebuild the sorted set */
                 zset *zs = o->ptr;
@@ -538,7 +486,6 @@ int rewriteAppendOnlyFile(char *filename) {
                 if (fwriteBulkObject(fp,&key) == 0) goto werr;
                 if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
             }
-            if (swapped) decrRefCount(o);
         }
         dictReleaseIterator(di);
     }
@@ -582,13 +529,16 @@ int rewriteAppendOnlyFileBackground(void) {
     pid_t childpid;
 
     if (server.bgrewritechildpid != -1) return REDIS_ERR;
-    if (server.vm_enabled) waitEmptyIOJobsQueue();
+    if (server.ds_enabled != 0) {
+        redisLog(REDIS_WARNING,"BGREWRITEAOF called with diskstore enabled: AOF is not supported when diskstore is enabled. Operation not performed.");
+        return REDIS_ERR;
+    }
     if ((childpid = fork()) == 0) {
         /* Child */
         char tmpfile[256];
 
-        if (server.vm_enabled) vmReopenSwapFile();
-        close(server.fd);
+        if (server.ipfd > 0) close(server.ipfd);
+        if (server.sofd > 0) close(server.sofd);
         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
         if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
             _exit(0);
@@ -619,12 +569,11 @@ int rewriteAppendOnlyFileBackground(void) {
 
 void bgrewriteaofCommand(redisClient *c) {
     if (server.bgrewritechildpid != -1) {
-        addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
+        addReplyError(c,"Background append only file rewriting already in progress");
         return;
     }
     if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
-        char *status = "+Background append only file rewriting started\r\n";
-        addReplySds(c,sdsnew(status));
+        addReplyStatus(c,"Background append only file rewriting started");
     } else {
         addReply(c,shared.err);
     }
@@ -639,10 +588,7 @@ void aofRemoveTempFile(pid_t childpid) {
 
 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
  * Handle this. */
-void backgroundRewriteDoneHandler(int statloc) {
-    int exitcode = WEXITSTATUS(statloc);
-    int bysignal = WIFSIGNALED(statloc);
-
+void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
     if (!bysignal && exitcode == 0) {
         int fd;
         char tmpfile[256];
@@ -690,7 +636,7 @@ void backgroundRewriteDoneHandler(int statloc) {
     } else {
         redisLog(REDIS_WARNING,
             "Background append only file rewriting terminated by signal %d",
-            WTERMSIG(statloc));
+            bysignal);
     }
 cleanup:
     sdsfree(server.bgrewritebuf);