]> git.saurik.com Git - redis.git/blobdiff - src/cluster.c
A reimplementation of blocking operation internals.
[redis.git] / src / cluster.c
index d0de9596a415dc6705cc6539aa5f853b4d0cce32..57243132d0da6656655d59c63b7bd0614b89f852 100644 (file)
@@ -1,9 +1,10 @@
 #include "redis.h"
 #include "redis.h"
-#include "sha1.h"
+#include "endianconv.h"
 
 #include <arpa/inet.h>
 #include <fcntl.h>
 #include <unistd.h>
 
 #include <arpa/inet.h>
 #include <fcntl.h>
 #include <unistd.h>
+#include <sys/socket.h>
 
 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
 
 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
@@ -209,7 +210,7 @@ void clusterInit(void) {
         exit(1);
     }
     if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
         exit(1);
     }
     if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
-        clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
+        clusterAcceptHandler, NULL) == AE_ERR) redisPanic("Unrecoverable error creating Redis Cluster file event.");
     server.cluster.slots_to_keys = zslCreate();
 }
 
     server.cluster.slots_to_keys = zslCreate();
 }
 
@@ -901,7 +902,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
     } else {
         payload = zmalloc(totlen);
         hdr = (clusterMsg*) payload;
     } else {
         payload = zmalloc(totlen);
         hdr = (clusterMsg*) payload;
-        memcpy(payload,hdr,sizeof(hdr));
+        memcpy(payload,hdr,sizeof(*hdr));
     }
     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
     }
     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
@@ -1464,8 +1465,8 @@ void clusterCommand(redisClient *c) {
 /* Generates a DUMP-format representation of the object 'o', adding it to the
  * io stream pointed by 'rio'. This function can't fail. */
 void createDumpPayload(rio *payload, robj *o) {
 /* Generates a DUMP-format representation of the object 'o', adding it to the
  * io stream pointed by 'rio'. This function can't fail. */
 void createDumpPayload(rio *payload, robj *o) {
-    unsigned char hash[20], buf[2];
-    SHA1_CTX ctx;
+    unsigned char buf[2];
+    uint64_t crc;
 
     /* Serialize the object in a RDB-like format. It consist of an object type
      * byte followed by the serialized object. This is understood by RESTORE. */
 
     /* Serialize the object in a RDB-like format. It consist of an object type
      * byte followed by the serialized object. This is understood by RESTORE. */
@@ -1474,39 +1475,45 @@ void createDumpPayload(rio *payload, robj *o) {
     redisAssert(rdbSaveObject(payload,o));
 
     /* Write the footer, this is how it looks like:
     redisAssert(rdbSaveObject(payload,o));
 
     /* Write the footer, this is how it looks like:
-     * ----------------+---------------------+--------------+
-     * ... RDB payload | 2 bytes RDB version | 8 bytes SHA1 |
-     * ----------------+---------------------+--------------+
-     * The SHA1 is just 8 bytes of truncated SHA1 of everything excluding itself.
-     * The 2 bytes RDB version is a little endian unsigned integer.  */
+     * ----------------+---------------------+---------------+
+     * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
+     * ----------------+---------------------+---------------+
+     * RDB version and CRC are both in little endian.
+     */
+
+    /* RDB version */
     buf[0] = REDIS_RDB_VERSION & 0xff;
     buf[1] = (REDIS_RDB_VERSION >> 8) & 0xff;
     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
 
     buf[0] = REDIS_RDB_VERSION & 0xff;
     buf[1] = (REDIS_RDB_VERSION >> 8) & 0xff;
     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
 
-    SHA1Init(&ctx);
-    SHA1Update(&ctx,(unsigned char*)payload->io.buffer.ptr,
-                    sdslen(payload->io.buffer.ptr));
-    SHA1Final(hash,&ctx);
-    payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,hash,8);
+    /* CRC64 */
+    crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
+                sdslen(payload->io.buffer.ptr));
+    memrev64ifbe(&crc);
+    payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
 }
 
 /* Verify that the RDB version of the dump payload matches the one of this Redis
 }
 
 /* Verify that the RDB version of the dump payload matches the one of this Redis
- * instance and that the truncated SHA1 is ok.
+ * instance and that the checksum is ok.
  * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR
  * is returned. */
 int verifyDumpPayload(unsigned char *p, size_t len) {
  * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR
  * is returned. */
 int verifyDumpPayload(unsigned char *p, size_t len) {
-    unsigned char hash[20], *footer;
-    SHA1_CTX ctx;
+    unsigned char *footer;
     uint16_t rdbver;
     uint16_t rdbver;
+    uint64_t crc;
 
 
+    /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
     if (len < 10) return REDIS_ERR;
     footer = p+(len-10);
     if (len < 10) return REDIS_ERR;
     footer = p+(len-10);
+
+    /* Verify RDB version */
     rdbver = (footer[1] << 8) | footer[0];
     if (rdbver != REDIS_RDB_VERSION) return REDIS_ERR;
     rdbver = (footer[1] << 8) | footer[0];
     if (rdbver != REDIS_RDB_VERSION) return REDIS_ERR;
-    SHA1Init(&ctx);
-    SHA1Update(&ctx,p,len-8);
-    SHA1Final(hash,&ctx);
-    return (memcmp(hash,footer+2,8) == 0) ? REDIS_OK : REDIS_ERR;
+
+    /* Verify CRC64 */
+    crc = crc64(0,p,len-8);
+    memrev64ifbe(&crc);
+    return (memcmp(&crc,footer+2,8) == 0) ? REDIS_OK : REDIS_ERR;
 }
 
 /* DUMP keyname
 }
 
 /* DUMP keyname
@@ -1553,7 +1560,7 @@ void restoreCommand(redisClient *c) {
         return;
     }
 
         return;
     }
 
-    /* Verify truncated SHA1 and RDB version. */
+    /* Verify RDB version and data checksum. */
     if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == REDIS_ERR) {
         addReplyError(c,"DUMP payload version or checksum are wrong");
         return;
     if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == REDIS_ERR) {
         addReplyError(c,"DUMP payload version or checksum are wrong");
         return;
@@ -1569,7 +1576,7 @@ void restoreCommand(redisClient *c) {
 
     /* Create the key and set the TTL if any */
     dbAdd(c->db,c->argv[1],obj);
 
     /* Create the key and set the TTL if any */
     dbAdd(c->db,c->argv[1],obj);
-    if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
+    if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
     signalModifiedKey(c->db,c->argv[1]);
     addReply(c,shared.ok);
     server.dirty++;
     signalModifiedKey(c->db,c->argv[1]);
     addReply(c,shared.ok);
     server.dirty++;
@@ -1580,7 +1587,7 @@ void migrateCommand(redisClient *c) {
     int fd;
     long timeout;
     long dbid;
     int fd;
     long timeout;
     long dbid;
-    time_t ttl;
+    long long ttl = 0, expireat;
     robj *o;
     rio cmd, payload;
 
     robj *o;
     rio cmd, payload;
 
@@ -1608,7 +1615,7 @@ void migrateCommand(redisClient *c) {
         return;
     }
     if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
         return;
     }
     if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
-        addReplyError(c,"Timeout connecting to the client");
+        addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
         return;
     }
 
         return;
     }
 
@@ -1618,12 +1625,16 @@ void migrateCommand(redisClient *c) {
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
     redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
 
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
     redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
 
-    ttl = getExpire(c->db,c->argv[3]);
+    expireat = getExpire(c->db,c->argv[3]);
+    if (expireat != -1) {
+        ttl = expireat-mstime();
+        if (ttl < 1) ttl = 1;
+    }
     redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
     redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
     redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
     redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
-    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
+    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
 
     /* Finally the last argument that is the serailized object payload
      * in the DUMP format. */
 
     /* Finally the last argument that is the serailized object payload
      * in the DUMP format. */
@@ -1640,7 +1651,7 @@ void migrateCommand(redisClient *c) {
 
         while ((towrite = sdslen(buf)-pos) > 0) {
             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
 
         while ((towrite = sdslen(buf)-pos) > 0) {
             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
-            nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
+            nwritten = syncWrite(fd,buf+pos,towrite,timeout);
             if (nwritten != (signed)towrite) goto socket_wr_err;
             pos += nwritten;
         }
             if (nwritten != (signed)towrite) goto socket_wr_err;
             pos += nwritten;
         }
@@ -1679,19 +1690,13 @@ void migrateCommand(redisClient *c) {
     return;
 
 socket_wr_err:
     return;
 
 socket_wr_err:
-    redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
-        strerror(errno));
-    addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
-        strerror(errno));
+    addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
     sdsfree(cmd.io.buffer.ptr);
     close(fd);
     return;
 
 socket_rd_err:
     sdsfree(cmd.io.buffer.ptr);
     close(fd);
     return;
 
 socket_rd_err:
-    redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
-        strerror(errno));
-    addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
-        strerror(errno));
+    addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n"));
     sdsfree(cmd.io.buffer.ptr);
     close(fd);
     return;
     sdsfree(cmd.io.buffer.ptr);
     close(fd);
     return;