]> git.saurik.com Git - redis.git/blobdiff - src/cluster.c
Scripting: add helper functions redis.error_reply() and redis.status_reply().
[redis.git] / src / cluster.c
index cfd891b4f573c229ee69da5a876b0e9f9bbca210..57243132d0da6656655d59c63b7bd0614b89f852 100644 (file)
@@ -1,8 +1,10 @@
 #include "redis.h"
+#include "endianconv.h"
 
 #include <arpa/inet.h>
 #include <fcntl.h>
 #include <unistd.h>
+#include <sys/socket.h>
 
 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
@@ -19,20 +21,6 @@ int clusterAddSlot(clusterNode *n, int slot);
  * Initialization
  * -------------------------------------------------------------------------- */
 
-void clusterGetRandomName(char *p) {
-    FILE *fp = fopen("/dev/urandom","r");
-    char *charset = "0123456789abcdef";
-    int j;
-
-    if (fp == NULL || fread(p,REDIS_CLUSTER_NAMELEN,1,fp) == 0) {
-        for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
-            p[j] = rand();
-    }
-    for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
-        p[j] = charset[p[j] & 0x0F];
-    fclose(fp);
-}
-
 int clusterLoadConfig(char *filename) {
     FILE *fp = fopen(filename,"r");
     char *line;
@@ -222,7 +210,7 @@ void clusterInit(void) {
         exit(1);
     }
     if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
-        clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
+        clusterAcceptHandler, NULL) == AE_ERR) redisPanic("Unrecoverable error creating Redis Cluster file event.");
     server.cluster.slots_to_keys = zslCreate();
 }
 
@@ -304,7 +292,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
     if (nodename)
         memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
     else
-        clusterGetRandomName(node->name);
+        getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN);
     node->flags = flags;
     memset(node->slots,0,sizeof(node->slots));
     node->numslaves = 0;
@@ -377,7 +365,7 @@ clusterNode *clusterLookupNode(char *name) {
     de = dictFind(server.cluster.nodes,s);
     sdsfree(s);
     if (de == NULL) return NULL;
-    return dictGetEntryVal(de);
+    return dictGetVal(de);
 }
 
 /* This is only used after the handshake. When we connect a given IP/PORT
@@ -439,7 +427,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
              * time PONG figure if it is newer than our figure.
              * Note that it's not a problem if we have a PING already 
              * in progress against this node. */
-            if (node->pong_received < ntohl(g->pong_received)) {
+            if (node->pong_received < (signed) ntohl(g->pong_received)) {
                  redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
                 node->pong_received = ntohl(g->pong_received);
             }
@@ -513,6 +501,8 @@ int clusterProcessPacket(clusterLink *link) {
 
     redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes",
         type, (unsigned long) totlen);
+
+    /* Perform sanity checks */
     if (totlen < 8) return 1;
     if (totlen > sdslen(link->rcvbuf)) return 1;
     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
@@ -531,7 +521,16 @@ int clusterProcessPacket(clusterLink *link) {
         explen += sizeof(clusterMsgDataFail);
         if (totlen != explen) return 1;
     }
+    if (type == CLUSTERMSG_TYPE_PUBLISH) {
+        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
+
+        explen += sizeof(clusterMsgDataPublish) +
+                ntohl(hdr->data.publish.msg.channel_len) +
+                ntohl(hdr->data.publish.msg.message_len);
+        if (totlen != explen) return 1;
+    }
 
+    /* Ready to process the packet. Dispatch by type. */
     sender = clusterLookupNode(hdr->sender);
     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
         int update_config = 0;
@@ -602,7 +601,7 @@ int clusterProcessPacket(clusterLink *link) {
             }
         }
         /* Update our info about the node */
-        link->node->pong_received = time(NULL);
+        if (link->node) link->node->pong_received = time(NULL);
 
         /* Update master/slave info */
         if (sender) {
@@ -665,6 +664,22 @@ int clusterProcessPacket(clusterLink *link) {
             clusterUpdateState();
             clusterSaveConfigOrDie();
         }
+    } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
+        robj *channel, *message;
+        uint32_t channel_len, message_len;
+
+        /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */
+        if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) {
+            channel_len = ntohl(hdr->data.publish.msg.channel_len);
+            message_len = ntohl(hdr->data.publish.msg.message_len);
+            channel = createStringObject(
+                        (char*)hdr->data.publish.msg.bulk_data,channel_len);
+            message = createStringObject(
+                        (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len);
+            pubsubPublishMessage(channel,message);
+            decrRefCount(channel);
+            decrRefCount(message);
+        }
     } else {
         redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
     }
@@ -766,7 +781,7 @@ void clusterBroadcastMessage(void *buf, size_t len) {
 
     di = dictGetIterator(server.cluster.nodes);
     while((de = dictNext(di)) != NULL) {
-        clusterNode *node = dictGetEntryVal(de);
+        clusterNode *node = dictGetVal(de);
 
         if (!node->link) continue;
         if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
@@ -777,7 +792,7 @@ void clusterBroadcastMessage(void *buf, size_t len) {
 
 /* Build the message header */
 void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
-    int totlen;
+    int totlen = 0;
 
     memset(hdr,0,sizeof(*hdr));
     hdr->type = htons(type);
@@ -822,7 +837,7 @@ void clusterSendPing(clusterLink *link, int type) {
     /* Populate the gossip fields */
     while(freshnodes > 0 && gossipcount < 3) {
         struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
-        clusterNode *this = dictGetEntryVal(de);
+        clusterNode *this = dictGetVal(de);
         clusterMsgDataGossip *gossip;
         int j;
 
@@ -887,7 +902,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
     } else {
         payload = zmalloc(totlen);
         hdr = (clusterMsg*) payload;
-        memcpy(payload,hdr,sizeof(hdr));
+        memcpy(payload,hdr,sizeof(*hdr));
     }
     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
@@ -943,7 +958,7 @@ void clusterCron(void) {
     /* Check if we have disconnected nodes and reestablish the connection. */
     di = dictGetIterator(server.cluster.nodes);
     while((de = dictNext(di)) != NULL) {
-        clusterNode *node = dictGetEntryVal(de);
+        clusterNode *node = dictGetVal(de);
 
         if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
         if (node->link == NULL) {
@@ -978,7 +993,7 @@ void clusterCron(void) {
      * the oldest ping_sent time */
     for (j = 0; j < 5; j++) {
         de = dictGetRandomKey(server.cluster.nodes);
-        clusterNode *this = dictGetEntryVal(de);
+        clusterNode *this = dictGetVal(de);
 
         if (this->link == NULL) continue;
         if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
@@ -995,7 +1010,7 @@ void clusterCron(void) {
     /* Iterate nodes to check if we need to flag something as failing */
     di = dictGetIterator(server.cluster.nodes);
     while((de = dictNext(di)) != NULL) {
-        clusterNode *node = dictGetEntryVal(de);
+        clusterNode *node = dictGetVal(de);
         int delay;
 
         if (node->flags &
@@ -1126,7 +1141,7 @@ sds clusterGenNodesDescription(void) {
 
     di = dictGetIterator(server.cluster.nodes);
     while((de = dictNext(di)) != NULL) {
-        clusterNode *node = dictGetEntryVal(de);
+        clusterNode *node = dictGetVal(de);
 
         /* Node coordinates */
         ci = sdscatprintf(ci,"%.40s %s:%d ",
@@ -1248,7 +1263,10 @@ void clusterCommand(redisClient *c) {
         addReplyBulk(c,o);
         decrRefCount(o);
     } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
-               !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) {
+               !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
+    {
+        /* CLUSTER ADDSLOTS <slot> [slot] ... */
+        /* CLUSTER DELSLOTS <slot> [slot] ... */
         int j, slot;
         unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
         int del = !strcasecmp(c->argv[1]->ptr,"delslots");
@@ -1441,9 +1459,86 @@ void clusterCommand(redisClient *c) {
 }
 
 /* -----------------------------------------------------------------------------
- * RESTORE and MIGRATE commands
+ * DUMP, RESTORE and MIGRATE commands
  * -------------------------------------------------------------------------- */
 
+/* Generates a DUMP-format representation of the object 'o', adding it to the
+ * io stream pointed by 'rio'. This function can't fail. */
+void createDumpPayload(rio *payload, robj *o) {
+    unsigned char buf[2];
+    uint64_t crc;
+
+    /* Serialize the object in a RDB-like format. It consist of an object type
+     * byte followed by the serialized object. This is understood by RESTORE. */
+    rioInitWithBuffer(payload,sdsempty());
+    redisAssert(rdbSaveObjectType(payload,o));
+    redisAssert(rdbSaveObject(payload,o));
+
+    /* Write the footer, this is how it looks like:
+     * ----------------+---------------------+---------------+
+     * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
+     * ----------------+---------------------+---------------+
+     * RDB version and CRC are both in little endian.
+     */
+
+    /* RDB version */
+    buf[0] = REDIS_RDB_VERSION & 0xff;
+    buf[1] = (REDIS_RDB_VERSION >> 8) & 0xff;
+    payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
+
+    /* CRC64 */
+    crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
+                sdslen(payload->io.buffer.ptr));
+    memrev64ifbe(&crc);
+    payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
+}
+
+/* Verify that the RDB version of the dump payload matches the one of this Redis
+ * instance and that the checksum is ok.
+ * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR
+ * is returned. */
+int verifyDumpPayload(unsigned char *p, size_t len) {
+    unsigned char *footer;
+    uint16_t rdbver;
+    uint64_t crc;
+
+    /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
+    if (len < 10) return REDIS_ERR;
+    footer = p+(len-10);
+
+    /* Verify RDB version */
+    rdbver = (footer[1] << 8) | footer[0];
+    if (rdbver != REDIS_RDB_VERSION) return REDIS_ERR;
+
+    /* Verify CRC64 */
+    crc = crc64(0,p,len-8);
+    memrev64ifbe(&crc);
+    return (memcmp(&crc,footer+2,8) == 0) ? REDIS_OK : REDIS_ERR;
+}
+
+/* DUMP keyname
+ * DUMP is actually not used by Redis Cluster but it is the obvious
+ * complement of RESTORE and can be useful for different applications. */
+void dumpCommand(redisClient *c) {
+    robj *o, *dumpobj;
+    rio payload;
+
+    /* Check if the key is here. */
+    if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
+        addReply(c,shared.nullbulk);
+        return;
+    }
+
+    /* Create the DUMP encoded representation. */
+    createDumpPayload(&payload,o);
+
+    /* Transfer to the client */
+    dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
+    addReplyBulk(c,dumpobj);
+    decrRefCount(dumpobj);
+    return;
+}
+
 /* RESTORE key ttl serialized-value */
 void restoreCommand(redisClient *c) {
     long ttl;
@@ -1465,6 +1560,12 @@ void restoreCommand(redisClient *c) {
         return;
     }
 
+    /* Verify RDB version and data checksum. */
+    if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == REDIS_ERR) {
+        addReplyError(c,"DUMP payload version or checksum are wrong");
+        return;
+    }
+
     rioInitWithBuffer(&payload,c->argv[3]->ptr);
     if (((type = rdbLoadObjectType(&payload)) == -1) ||
         ((obj = rdbLoadObject(type,&payload)) == NULL))
@@ -1475,7 +1576,8 @@ void restoreCommand(redisClient *c) {
 
     /* Create the key and set the TTL if any */
     dbAdd(c->db,c->argv[1],obj);
-    if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
+    if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
+    signalModifiedKey(c->db,c->argv[1]);
     addReply(c,shared.ok);
     server.dirty++;
 }
@@ -1485,7 +1587,7 @@ void migrateCommand(redisClient *c) {
     int fd;
     long timeout;
     long dbid;
-    time_t ttl;
+    long long ttl = 0, expireat;
     robj *o;
     rio cmd, payload;
 
@@ -1500,7 +1602,7 @@ void migrateCommand(redisClient *c) {
      * nothing to migrate (for instance the key expired in the meantime), but
      * we include such information in the reply string. */
     if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
-        addReplySds(c,sdsnew("+NOKEY"));
+        addReplySds(c,sdsnew("+NOKEY\r\n"));
         return;
     }
     
@@ -1513,28 +1615,32 @@ void migrateCommand(redisClient *c) {
         return;
     }
     if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
-        addReplyError(c,"Timeout connecting to the client");
+        addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
         return;
     }
 
+    /* Create RESTORE payload and generate the protocol to call the command. */
     rioInitWithBuffer(&cmd,sdsempty());
     redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
     redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
 
-    ttl = getExpire(c->db,c->argv[3]);
+    expireat = getExpire(c->db,c->argv[3]);
+    if (expireat != -1) {
+        ttl = expireat-mstime();
+        if (ttl < 1) ttl = 1;
+    }
     redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
     redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
-    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
+    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
 
     /* Finally the last argument that is the serailized object payload
-     * in the form: <type><rdb-serialized-object>. */
-    rioInitWithBuffer(&payload,sdsempty());
-    redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
-    redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1);
-    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));
+     * in the DUMP format. */
+    createDumpPayload(&payload,o);
+    redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
+                                sdslen(payload.io.buffer.ptr)));
     sdsfree(payload.io.buffer.ptr);
 
     /* Tranfer the query to the other node in 64K chunks. */
@@ -1545,7 +1651,7 @@ void migrateCommand(redisClient *c) {
 
         while ((towrite = sdslen(buf)-pos) > 0) {
             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
-            nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
+            nwritten = syncWrite(fd,buf+pos,towrite,timeout);
             if (nwritten != (signed)towrite) goto socket_wr_err;
             pos += nwritten;
         }
@@ -1568,11 +1674,12 @@ void migrateCommand(redisClient *c) {
             robj *aux;
 
             dbDelete(c->db,c->argv[3]);
+            signalModifiedKey(c->db,c->argv[3]);
             addReply(c,shared.ok);
             server.dirty++;
 
             /* Translate MIGRATE as DEL for replication/AOF. */
-            aux = createStringObject("DEL",2);
+            aux = createStringObject("DEL",3);
             rewriteClientCommandVector(c,2,aux,c->argv[3]);
             decrRefCount(aux);
         }
@@ -1583,48 +1690,29 @@ void migrateCommand(redisClient *c) {
     return;
 
 socket_wr_err:
-    redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
-        strerror(errno));
-    addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
-        strerror(errno));
+    addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
     sdsfree(cmd.io.buffer.ptr);
     close(fd);
     return;
 
 socket_rd_err:
-    redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
-        strerror(errno));
-    addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
-        strerror(errno));
+    addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n"));
     sdsfree(cmd.io.buffer.ptr);
     close(fd);
     return;
 }
 
-/* DUMP keyname
- * DUMP is actually not used by Redis Cluster but it is the obvious
- * complement of RESTORE and can be useful for different applications. */
-void dumpCommand(redisClient *c) {
-    robj *o, *dumpobj;
-    rio payload;
-
-    /* Check if the key is here. */
-    if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
-        addReply(c,shared.nullbulk);
+/* The ASKING command is required after a -ASK redirection.
+ * The client should issue ASKING before to actualy send the command to
+ * the target instance. See the Redis Cluster specification for more
+ * information. */
+void askingCommand(redisClient *c) {
+    if (server.cluster_enabled == 0) {
+        addReplyError(c,"This instance has cluster support disabled");
         return;
     }
-
-    /* Serialize the object in a RDB-like format. It consist of an object type
-     * byte followed by the serialized object. This is understood by RESTORE. */
-    rioInitWithBuffer(&payload,sdsempty());
-    redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o));
-    redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o));
-
-    /* Transfer to the client */
-    dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
-    addReplyBulk(c,dumpobj);
-    decrRefCount(dumpobj);
-    return;
+    c->flags |= REDIS_ASKING;
+    addReply(c,shared.ok);
 }
 
 /* -----------------------------------------------------------------------------
@@ -1720,9 +1808,12 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg
     }
     /* Handle the case in which we are receiving this hash slot from
      * another instance, so we'll accept the query even if in the table
-     * it is assigned to a different node. */
-    if (server.cluster.importing_slots_from[slot] != NULL)
+     * it is assigned to a different node, but only if the client
+     * issued an ASKING command before. */
+    if (server.cluster.importing_slots_from[slot] != NULL &&
+        c->flags & REDIS_ASKING) {
         return server.cluster.myself;
+    }
     /* It's not a -ASK case. Base case: just return the right node. */
     return n;
 }