]> git.saurik.com Git - redis.git/blobdiff - src/aof.c
Refactor code for BRPOPLPUSH.
[redis.git] / src / aof.c
index f8b92d2d3301ebee556c02612c0123461b195728..959a5f52af6a6a4baa1f3def8165d08758b72c2c 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
@@ -189,6 +189,7 @@ struct redisClient *createFakeClient(void) {
     c->querybuf = sdsempty();
     c->argc = 0;
     c->argv = NULL;
+    c->bufpos = 0;
     c->flags = 0;
     /* We set the fake client as a slave waiting for the synchronization
      * so that Redis will not try to send replies to this client. */
@@ -217,6 +218,7 @@ int loadAppendOnlyFile(char *filename) {
     FILE *fp = fopen(filename,"r");
     struct redis_stat sb;
     int appendonly = server.appendonly;
+    long loops = 0;
 
     if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
         return REDIS_ERR;
@@ -231,6 +233,8 @@ int loadAppendOnlyFile(char *filename) {
     server.appendonly = 0;
 
     fakeClient = createFakeClient();
+    startLoading(fp);
+
     while(1) {
         int argc, j;
         unsigned long len;
@@ -240,6 +244,12 @@ int loadAppendOnlyFile(char *filename) {
         struct redisCommand *cmd;
         int force_swapout;
 
+        /* Serve the clients from time to time */
+        if (!(loops++ % 1000)) {
+            loadingProgress(ftello(fp));
+            aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
+        }
+
         if (fgets(buf,sizeof(buf),fp) == NULL) {
             if (feof(fp))
                 break;
@@ -265,19 +275,18 @@ int loadAppendOnlyFile(char *filename) {
             redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
             exit(1);
         }
-        /* Try object encoding */
-        if (cmd->flags & REDIS_CMD_BULK)
-            argv[argc-1] = tryObjectEncoding(argv[argc-1]);
         /* Run the command in the context of a fake client */
         fakeClient->argc = argc;
         fakeClient->argv = argv;
         cmd->proc(fakeClient);
-        /* Discard the reply objects list from the fake client */
-        while(listLength(fakeClient->reply))
-            listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
+
+        /* The fake client should not have a reply */
+        redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
+
         /* Clean up, ready for the next command */
         for (j = 0; j < argc; j++) decrRefCount(argv[j]);
         zfree(argv);
+
         /* Handle swapping while loading big datasets when VM is on */
         force_swapout = 0;
         if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
@@ -297,6 +306,7 @@ int loadAppendOnlyFile(char *filename) {
     fclose(fp);
     freeFakeClient(fakeClient);
     server.appendonly = appendonly;
+    stopLoading();
     return REDIS_OK;
 
 readerr:
@@ -307,59 +317,10 @@ readerr:
     }
     exit(1);
 fmterr:
-    redisLog(REDIS_WARNING,"Bad file format reading the append only file");
+    redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
     exit(1);
 }
 
-/* Write binary-safe string into a file in the bulkformat
- * $<count>\r\n<payload>\r\n */
-int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
-    char cbuf[128];
-    int clen;
-    cbuf[0] = '$';
-    clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len);
-    cbuf[clen++] = '\r';
-    cbuf[clen++] = '\n';
-    if (fwrite(cbuf,clen,1,fp) == 0) return 0;
-    if (len > 0 && fwrite(s,len,1,fp) == 0) return 0;
-    if (fwrite("\r\n",2,1,fp) == 0) return 0;
-    return 1;
-}
-
-/* Write a double value in bulk format $<count>\r\n<payload>\r\n */
-int fwriteBulkDouble(FILE *fp, double d) {
-    char buf[128], dbuf[128];
-
-    snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
-    snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
-    if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
-    if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
-    return 1;
-}
-
-/* Write a long value in bulk format $<count>\r\n<payload>\r\n */
-int fwriteBulkLongLong(FILE *fp, long long l) {
-    char bbuf[128], lbuf[128];
-    unsigned int blen, llen;
-    llen = ll2string(lbuf,32,l);
-    blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf);
-    if (fwrite(bbuf,blen,1,fp) == 0) return 0;
-    return 1;
-}
-
-/* Delegate writing an object to writing a bulk string or bulk long long. */
-int fwriteBulkObject(FILE *fp, robj *obj) {
-    /* Avoid using getDecodedObject to help copy-on-write (we are often
-     * in a child process when this function is called). */
-    if (obj->encoding == REDIS_ENCODING_INT) {
-        return fwriteBulkLongLong(fp,(long)obj->ptr);
-    } else if (obj->encoding == REDIS_ENCODING_RAW) {
-        return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr));
-    } else {
-        redisPanic("Unknown string encoding");
-    }
-}
-
 /* Write a sequence of commands able to fully rebuild the dataset into
  * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
 int rewriteAppendOnlyFile(char *filename) {
@@ -463,20 +424,30 @@ int rewriteAppendOnlyFile(char *filename) {
                     redisPanic("Unknown list encoding");
                 }
             } else if (o->type == REDIS_SET) {
-                /* Emit the SADDs needed to rebuild the set */
-                dict *set = o->ptr;
-                dictIterator *di = dictGetIterator(set);
-                dictEntry *de;
+                char cmd[]="*3\r\n$4\r\nSADD\r\n";
 
-                while((de = dictNext(di)) != NULL) {
-                    char cmd[]="*3\r\n$4\r\nSADD\r\n";
-                    robj *eleobj = dictGetEntryKey(de);
-
-                    if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                    if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                    if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                /* Emit the SADDs needed to rebuild the set */
+                if (o->encoding == REDIS_ENCODING_INTSET) {
+                    int ii = 0;
+                    int64_t llval;
+                    while(intsetGet(o->ptr,ii++,&llval)) {
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                        if (fwriteBulkLongLong(fp,llval) == 0) goto werr;
+                    }
+                } else if (o->encoding == REDIS_ENCODING_HT) {
+                    dictIterator *di = dictGetIterator(o->ptr);
+                    dictEntry *de;
+                    while((de = dictNext(di)) != NULL) {
+                        robj *eleobj = dictGetEntryKey(de);
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                        if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                    }
+                    dictReleaseIterator(di);
+                } else {
+                    redisPanic("Unknown set encoding");
                 }
-                dictReleaseIterator(di);
             } else if (o->type == REDIS_ZSET) {
                 /* Emit the ZADDs needed to rebuild the sorted set */
                 zset *zs = o->ptr;
@@ -588,7 +559,8 @@ int rewriteAppendOnlyFileBackground(void) {
         char tmpfile[256];
 
         if (server.vm_enabled) vmReopenSwapFile();
-        close(server.fd);
+        if (server.ipfd > 0) close(server.ipfd);
+        if (server.sofd > 0) close(server.sofd);
         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
         if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
             _exit(0);
@@ -619,12 +591,11 @@ int rewriteAppendOnlyFileBackground(void) {
 
 void bgrewriteaofCommand(redisClient *c) {
     if (server.bgrewritechildpid != -1) {
-        addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
+        addReplyError(c,"Background append only file rewriting already in progress");
         return;
     }
     if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
-        char *status = "+Background append only file rewriting started\r\n";
-        addReplySds(c,sdsnew(status));
+        addReplyStatus(c,"Background append only file rewriting started");
     } else {
         addReply(c,shared.err);
     }