]> git.saurik.com Git - redis.git/blobdiff - src/cluster.c
Copyright date fixed in COPYING file.
[redis.git] / src / cluster.c
index 07552c18ae05e72671152c3719e90bba5e056af0..498715782f2a81cbf03992c229dcc1973315b2e9 100644 (file)
@@ -1,3 +1,33 @@
+/* Redis Cluster implementation.
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *   * Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   * Redistributions in binary form must reproduce the above copyright
+ *     notice, this list of conditions and the following disclaimer in the
+ *     documentation and/or other materials provided with the distribution.
+ *   * Neither the name of Redis nor the names of its contributors may be used
+ *     to endorse or promote products derived from this software without
+ *     specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
 #include "redis.h"
 #include "endianconv.h"
 
@@ -210,7 +240,7 @@ void clusterInit(void) {
         exit(1);
     }
     if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
-        clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
+        clusterAcceptHandler, NULL) == AE_ERR) redisPanic("Unrecoverable error creating Redis Cluster file event.");
     server.cluster.slots_to_keys = zslCreate();
 }
 
@@ -902,7 +932,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
     } else {
         payload = zmalloc(totlen);
         hdr = (clusterMsg*) payload;
-        memcpy(payload,hdr,sizeof(hdr));
+        memcpy(payload,hdr,sizeof(*hdr));
     }
     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
@@ -1539,15 +1569,25 @@ void dumpCommand(redisClient *c) {
     return;
 }
 
-/* RESTORE key ttl serialized-value */
+/* RESTORE key ttl serialized-value [REPLACE] */
 void restoreCommand(redisClient *c) {
     long ttl;
     rio payload;
-    int type;
+    int j, type, replace = 0;
     robj *obj;
 
+    /* Parse additional options */
+    for (j = 4; j < c->argc; j++) {
+        if (!strcasecmp(c->argv[j]->ptr,"replace")) {
+            replace = 1;
+        } else {
+            addReply(c,shared.syntaxerr);
+            return;
+        }
+    }
+
     /* Make sure this key does not already exist here... */
-    if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
+    if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
         addReplyError(c,"Target key name is busy.");
         return;
     }
@@ -1574,6 +1614,9 @@ void restoreCommand(redisClient *c) {
         return;
     }
 
+    /* Remove the old key if needed. */
+    if (replace) dbDelete(c->db,c->argv[1]);
+
     /* Create the key and set the TTL if any */
     dbAdd(c->db,c->argv[1],obj);
     if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
@@ -1582,15 +1625,27 @@ void restoreCommand(redisClient *c) {
     server.dirty++;
 }
 
-/* MIGRATE host port key dbid timeout */
+/* MIGRATE host port key dbid timeout [COPY | REPLACE] */
 void migrateCommand(redisClient *c) {
-    int fd;
+    int fd, copy = 0, replace = 0, j;
     long timeout;
     long dbid;
     long long ttl = 0, expireat;
     robj *o;
     rio cmd, payload;
 
+    /* Parse additional options */
+    for (j = 6; j < c->argc; j++) {
+        if (!strcasecmp(c->argv[j]->ptr,"copy")) {
+            copy = 1;
+        } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
+            replace = 1;
+        } else {
+            addReply(c,shared.syntaxerr);
+            return;
+        }
+    }
+
     /* Sanity check */
     if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
         return;
@@ -1630,19 +1685,24 @@ void migrateCommand(redisClient *c) {
         ttl = expireat-mstime();
         if (ttl < 1) ttl = 1;
     }
-    redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
+    redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
     redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
     redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
 
-    /* Finally the last argument that is the serailized object payload
-     * in the DUMP format. */
+    /* Emit the payload argument, that is the serailized object using
+     * the DUMP format. */
     createDumpPayload(&payload,o);
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
                                 sdslen(payload.io.buffer.ptr)));
     sdsfree(payload.io.buffer.ptr);
 
+    /* Add the REPLACE option to the RESTORE command if it was specified
+     * as a MIGRATE option. */
+    if (replace)
+        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
+
     /* Tranfer the query to the other node in 64K chunks. */
     {
         sds buf = cmd.io.buffer.ptr;
@@ -1673,8 +1733,11 @@ void migrateCommand(redisClient *c) {
         } else {
             robj *aux;
 
-            dbDelete(c->db,c->argv[3]);
-            signalModifiedKey(c->db,c->argv[3]);
+            if (!copy) {
+                /* No COPY option: remove the local key, signal the change. */
+                dbDelete(c->db,c->argv[3]);
+                signalModifiedKey(c->db,c->argv[3]);
+            }
             addReply(c,shared.ok);
             server.dirty++;