]> git.saurik.com Git - redis.git/blobdiff - redis.c
Initial implementation of append-only mode. Loading still not implemented.
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index 85df6d316a21d2f14bff435b686d09e78bf72a34..19abda2067bf608efcfb4234d17105fe92dbf428 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -261,6 +261,9 @@ struct redisServer {
     int maxidletime;
     int dbnum;
     int daemonize;
+    int appendonly;
+    int appendfd;
+    int appendseldb;
     char *pidfile;
     int bgsaveinprogress;
     pid_t bgsavechildpid;
@@ -269,6 +272,7 @@ struct redisServer {
     char *logfile;
     char *bindaddr;
     char *dbfilename;
+    char *appendfilename;
     char *requirepass;
     int shareobjects;
     /* Replication related */
@@ -364,6 +368,7 @@ static void incrRefCount(robj *o);
 static int rdbSaveBackground(char *filename);
 static robj *createStringObject(char *ptr, size_t len);
 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
+static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
 static int syncWithMaster(void);
 static robj *tryObjectSharing(robj *o);
 static int tryObjectEncoding(robj *o);
@@ -440,6 +445,7 @@ static void infoCommand(redisClient *c);
 static void mgetCommand(redisClient *c);
 static void monitorCommand(redisClient *c);
 static void expireCommand(redisClient *c);
+static void expireatCommand(redisClient *c);
 static void getsetCommand(redisClient *c);
 static void ttlCommand(redisClient *c);
 static void slaveofCommand(redisClient *c);
@@ -511,6 +517,7 @@ static struct redisCommand cmdTable[] = {
     {"rename",renameCommand,3,REDIS_CMD_INLINE},
     {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
     {"expire",expireCommand,3,REDIS_CMD_INLINE},
+    {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
     {"keys",keysCommand,2,REDIS_CMD_INLINE},
     {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
     {"auth",authCommand,2,REDIS_CMD_INLINE},
@@ -1021,8 +1028,12 @@ static void initServerConfig() {
     server.bindaddr = NULL;
     server.glueoutputbuf = 1;
     server.daemonize = 0;
+    server.appendonly = 0;
+    server.appendfd = -1;
+    server.appendseldb = -1; /* Make sure the first time will not match */
     server.pidfile = "/var/run/redis.pid";
     server.dbfilename = "dump.rdb";
+    server.appendfilename = "appendonly.log";
     server.requirepass = NULL;
     server.shareobjects = 0;
     server.sharingpoolsize = 1024;
@@ -1082,6 +1093,15 @@ static void initServer() {
     server.stat_numconnections = 0;
     server.stat_starttime = time(NULL);
     aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
+
+    if (server.appendonly) {
+        server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT);
+        if (server.appendfd == -1) {
+            redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
+                strerror(errno));
+            exit(1);
+        }
+    }
 }
 
 /* Empty the whole database */
@@ -1221,6 +1241,10 @@ static void loadServerConfig(char *filename) {
             if ((server.daemonize = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
             }
+        } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
+            if ((server.appendonly = yesnotoi(argv[1])) == -1) {
+                err = "argument must be 'yes' or 'no'"; goto loaderr;
+            }
         } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
           server.requirepass = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
@@ -1536,6 +1560,8 @@ static int processCommand(redisClient *c) {
     /* Exec the command */
     dirty = server.dirty;
     cmd->proc(c);
+    if (server.appendonly != 0)
+        feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
     if (server.dirty-dirty != 0 && listLength(server.slaves))
         replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
     if (listLength(server.monitors))
@@ -1620,6 +1646,54 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di
     if (outv != static_outv) zfree(outv);
 }
 
+/* TODO: translate EXPIREs into EXPIRETOs */
+static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
+    sds buf = sdsempty();
+    int j;
+    ssize_t nwritten;
+
+    /* The DB this command was targetting is not the same as the last command
+     * we appendend. To issue a SELECT command is needed. */
+    if (dictid != server.appendseldb) {
+        char seldb[64];
+
+        snprintf(seldb,sizeof(seldb),"%d",dictid);
+        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
+            strlen(seldb),seldb);
+    }
+    /* Append the actual command */
+    buf = sdscatprintf(buf,"*%d\r\n",argc);
+    for (j = 0; j < argc; j++) {
+        robj *o = argv[j];
+
+        if (o->encoding != REDIS_ENCODING_RAW)
+            o = getDecodedObject(o);
+        buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
+        buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
+        buf = sdscatlen(buf,"\r\n",2);
+        if (o != argv[j])
+            decrRefCount(o);
+    }
+    /* We want to perform a single write. This should be guaranteed atomic
+     * at least if the filesystem we are writing is a real physical one.
+     * While this will save us against the server being killed I don't think
+     * there is much to do about the whole server stopping for power problems
+     * or alike */
+     nwritten = write(server.appendfd,buf,sdslen(buf));
+     if (nwritten != (unsigned)sdslen(buf)) {
+        /* Ooops, we are in troubles. The best thing to do for now is
+         * to simply exit instead to give the illusion that everything is
+         * working as expected. */
+         if (nwritten == -1) {
+            redisLog(REDIS_WARNING,"Aborting on error writing to the append-only file: %s",strerror(errno));
+         } else {
+            redisLog(REDIS_WARNING,"Aborting on short write while writing to the append-only file: %s",strerror(errno));
+         }
+         abort();
+     }
+     fsync(server.appendfd); /* Let's try to get this data on the disk */
+}
+
 static void processInputBuffer(redisClient *c) {
 again:
     if (c->bulklen == -1) {
@@ -4736,21 +4810,21 @@ static int deleteIfVolatile(redisDb *db, robj *key) {
     return dictDelete(db->dict,key) == DICT_OK;
 }
 
-static void expireCommand(redisClient *c) {
+static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
     dictEntry *de;
-    int seconds = atoi(c->argv[2]->ptr);
 
-    de = dictFind(c->db->dict,c->argv[1]);
+    de = dictFind(c->db->dict,key);
     if (de == NULL) {
         addReply(c,shared.czero);
         return;
     }
-    if (seconds <= 0) {
-        addReply(c, shared.czero);
+    if (seconds < 0) {
+        if (deleteKey(c->db,key)) server.dirty++;
+        addReply(c, shared.cone);
         return;
     } else {
         time_t when = time(NULL)+seconds;
-        if (setExpire(c->db,c->argv[1],when)) {
+        if (setExpire(c->db,key,when)) {
             addReply(c,shared.cone);
             server.dirty++;
         } else {
@@ -4760,6 +4834,14 @@ static void expireCommand(redisClient *c) {
     }
 }
 
+static void expireCommand(redisClient *c) {
+    expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
+}
+
+static void expireatCommand(redisClient *c) {
+    expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
+}
+
 static void ttlCommand(redisClient *c) {
     time_t expire;
     int ttl = -1;
@@ -5312,6 +5394,7 @@ static struct redisFunctionSym symsTable[] = {
 {"mgetCommand", (unsigned long)mgetCommand},
 {"monitorCommand", (unsigned long)monitorCommand},
 {"expireCommand", (unsigned long)expireCommand},
+{"expireatCommand", (unsigned long)expireatCommand},
 {"getsetCommand", (unsigned long)getsetCommand},
 {"ttlCommand", (unsigned long)ttlCommand},
 {"slaveofCommand", (unsigned long)slaveofCommand},