]> git.saurik.com Git - redis.git/blobdiff - src/migrate.c
Fix key archival configuration from static config.
[redis.git] / src / migrate.c
index 9087531216469ee9bf0d81a11da209c9297d1b1a..a08aa9611205ad493b555bd38796a29ab86f4948 100644 (file)
@@ -1,10 +1,16 @@
 #include "redis.h"
 #include "endianconv.h"
 
+#include <sys/stat.h>
+#include <lmdb.h>
+
 /* -----------------------------------------------------------------------------
  * DUMP, RESTORE and MIGRATE commands
  * -------------------------------------------------------------------------- */
 
+MDB_env *env;
+MDB_dbi dbi;
+
 /* Generates a DUMP-format representation of the object 'o', adding it to the
  * io stream pointed by 'rio'. This function can't fail. */
 void createDumpPayload(rio *payload, robj *o) {
@@ -30,7 +36,7 @@ void createDumpPayload(rio *payload, robj *o) {
     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
 
     /* CRC64 */
-    crc = crc64((unsigned char*)payload->io.buffer.ptr,
+    crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
                 sdslen(payload->io.buffer.ptr));
     memrev64ifbe(&crc);
     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
@@ -54,7 +60,7 @@ int verifyDumpPayload(unsigned char *p, size_t len) {
     if (rdbver != REDIS_RDB_VERSION) return REDIS_ERR;
 
     /* Verify CRC64 */
-    crc = crc64(p,len-8);
+    crc = crc64(0,p,len-8);
     memrev64ifbe(&crc);
     return (memcmp(&crc,footer+2,8) == 0) ? REDIS_OK : REDIS_ERR;
 }
@@ -130,7 +136,7 @@ void migrateCommand(redisClient *c) {
     int fd;
     long timeout;
     long dbid;
-    time_t ttl;
+    long long ttl = 0, expireat;
     robj *o;
     rio cmd, payload;
 
@@ -139,7 +145,7 @@ void migrateCommand(redisClient *c) {
         return;
     if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
         return;
-    if (timeout <= 0) timeout = 1;
+    if (timeout <= 0) timeout = 1000;
 
     /* Check if the key is here. If not we reply with success as there is
      * nothing to migrate (for instance the key expired in the meantime), but
@@ -157,7 +163,8 @@ void migrateCommand(redisClient *c) {
             server.neterr);
         return;
     }
-    if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
+    if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
+        close(fd);
         addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
         return;
     }
@@ -168,12 +175,16 @@ void migrateCommand(redisClient *c) {
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
     redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
 
-    ttl = getExpire(c->db,c->argv[3]);
+    expireat = getExpire(c->db,c->argv[3]);
+    if (expireat != -1) {
+        ttl = expireat-mstime();
+        if (ttl < 1) ttl = 1;
+    }
     redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
     redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
-    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
+    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
 
     /* Finally the last argument that is the serailized object payload
      * in the DUMP format. */
@@ -190,7 +201,7 @@ void migrateCommand(redisClient *c) {
 
         while ((towrite = sdslen(buf)-pos) > 0) {
             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
-            nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
+            nwritten = syncWrite(fd,buf+pos,towrite,timeout);
             if (nwritten != (signed)towrite) goto socket_wr_err;
             pos += nwritten;
         }
@@ -240,3 +251,150 @@ socket_rd_err:
     close(fd);
     return;
 }
+
+void stopKeyArchive(void) {
+    redisAssert(env != NULL);
+
+    mdb_dbi_close(env, dbi);
+    mdb_env_close(env);
+    env = NULL;
+
+    server.mdb_state = REDIS_MDB_OFF;
+}
+
+int startKeyArchive(void) {
+    redisAssert(env == NULL);
+
+    int ret;
+
+    ret = mdb_env_create(&env);
+    if (ret != 0) return ret;
+
+    ret = mdb_env_set_mapsize(env, server.mdb_mapsize);
+    if (ret != 0) return ret;
+
+    ret = mdb_env_set_maxdbs(env, 1);
+    if (ret != 0) return ret;
+
+    mkdir(server.mdb_environment, 0644);
+
+    ret = mdb_env_open(env, server.mdb_environment, MDB_FIXEDMAP | MDB_NOSYNC, 0664);
+    if (ret != 0) return ret;
+
+    MDB_txn *txn;
+    ret = mdb_txn_begin(env, NULL, 0, &txn);
+    if (ret != 0) return ret;
+
+    ret = mdb_open(txn, NULL, 0, &dbi);
+    if (ret != 0) return ret;
+
+    mdb_txn_commit(txn);
+
+    server.mdb_state = REDIS_MDB_ON;
+    return 0;
+}
+
+int archive(redisDb *db, robj *key) {
+    if (server.mdb_state == REDIS_MDB_OFF)
+        return 1;
+    redisAssert(env != NULL);
+
+    MDB_val kval;
+    kval.mv_data = key->ptr;
+    kval.mv_size = sdslen((sds)key->ptr);
+
+    robj *object;
+    object = lookupKey(db, key);
+    if (object == NULL)
+        return 0;
+
+    if (object->archived != 0)
+        return 1;
+
+    rio payload;
+    createDumpPayload(&payload, object);
+
+    MDB_val dval;
+    dval.mv_size = sdslen(payload.io.buffer.ptr);
+    dval.mv_data = payload.io.buffer.ptr;
+
+    int ret;
+
+    MDB_txn *txn;
+    ret = mdb_txn_begin(env, NULL, 0, &txn);
+    if (ret != 0)
+        goto archive_err;
+
+    ret = mdb_put(txn, dbi, &kval, &dval, 0);
+    if (ret != 0) {
+        mdb_txn_abort(txn);
+        goto archive_err;
+    }
+
+    mdb_txn_commit(txn);
+    sdsfree(payload.io.buffer.ptr);
+    return 1;
+
+archive_err:
+    sdsfree(payload.io.buffer.ptr);
+    redisAssert(0);
+    return 0;
+}
+
+robj *recover(redisDb *db, robj *key) {
+    if (server.mdb_state == REDIS_MDB_OFF)
+        return NULL;
+
+    int ret;
+
+    MDB_val kval;
+    kval.mv_data = key->ptr;
+    kval.mv_size = sdslen((sds)key->ptr);
+
+    MDB_txn *txn;
+    ret = mdb_txn_begin(env, NULL, 0, &txn);
+    if (ret != 0)
+        return NULL;
+
+    MDB_cursor *cursor;
+    ret = mdb_cursor_open(txn, dbi, &cursor);
+    if (ret != 0) {
+        mdb_txn_abort(txn);
+        return NULL;
+    }
+
+    MDB_val pval;
+    ret = mdb_cursor_get(cursor, &kval, &pval, MDB_SET);
+    if (ret != 0) {
+        mdb_txn_abort(txn);
+        return NULL;
+    }
+
+    sds sval = sdsnewlen(pval.mv_data, pval.mv_size);
+    mdb_cursor_close(cursor);
+    mdb_txn_abort(txn);
+
+    rio payload;
+    rioInitWithBuffer(&payload, sval);
+
+    int type = rdbLoadObjectType(&payload);
+    if (type == -1)
+        goto recover_err;
+
+    robj *object = rdbLoadObject(type, &payload);
+    if (object == NULL)
+        goto recover_err;
+
+    object->archived = 1;
+
+    dbAdd(db, key, object);
+    signalModifiedKey(db, key);
+    server.dirty++;
+
+    sdsfree(sval);
+    return object;
+
+recover_err:
+    sdsfree(sval);
+    return NULL;
+}