X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/b075621fb7d8b44b10d849e8db6db9b2498056ad..af0b220756571bc8faf57a0c7b7389dd86a60376:/src/replication.c?ds=sidebyside diff --git a/src/replication.c b/src/replication.c index 851a40f8..720cd4c1 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1,8 +1,40 @@ +/* Asynchronous replication implementation. + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + #include "redis.h" #include #include #include +#include #include /* ---------------------------------- MASTER -------------------------------- */ @@ -10,38 +42,8 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listNode *ln; listIter li; - int outc = 0, j; - robj **outv; - /* We need 1+(ARGS*3) objects since commands are using the new protocol - * and we one 1 object for the first "*\r\n" multibulk count, then - * for every additional object we have "$\r\n" + object + "\r\n". */ - robj *static_outv[REDIS_STATIC_ARGS*3+1]; - robj *lenobj; - - if (argc <= REDIS_STATIC_ARGS) { - outv = static_outv; - } else { - outv = zmalloc(sizeof(robj*)*(argc*3+1)); - } + int j; - lenobj = createObject(REDIS_STRING, - sdscatprintf(sdsempty(), "*%d\r\n", argc)); - lenobj->refcount = 0; - outv[outc++] = lenobj; - for (j = 0; j < argc; j++) { - lenobj = createObject(REDIS_STRING, - sdscatprintf(sdsempty(),"$%lu\r\n", - (unsigned long) stringObjectLen(argv[j]))); - lenobj->refcount = 0; - outv[outc++] = lenobj; - outv[outc++] = argv[j]; - outv[outc++] = shared.crlf; - } - - /* Increment all the refcounts at start and decrement at end in order to - * be sure to free objects if there is no slave in a replication state - * able to be feed with commands */ - for (j = 0; j < outc; j++) incrRefCount(outv[j]); listRewind(slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; @@ -49,47 +51,47 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; - /* Feed all the other slaves, MONITORs and so on */ + /* Feed slaves that are waiting for the initial SYNC (so these commands + * are queued in the output buffer until the intial SYNC completes), + * or are already in sync with the master. */ if (slave->slaveseldb != dictid) { robj *selectcmd; - switch(dictid) { - case 0: selectcmd = shared.select0; break; - case 1: selectcmd = shared.select1; break; - case 2: selectcmd = shared.select2; break; - case 3: selectcmd = shared.select3; break; - case 4: selectcmd = shared.select4; break; - case 5: selectcmd = shared.select5; break; - case 6: selectcmd = shared.select6; break; - case 7: selectcmd = shared.select7; break; - case 8: selectcmd = shared.select8; break; - case 9: selectcmd = shared.select9; break; - default: + if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) { + selectcmd = shared.select[dictid]; + incrRefCount(selectcmd); + } else { selectcmd = createObject(REDIS_STRING, sdscatprintf(sdsempty(),"select %d\r\n",dictid)); - selectcmd->refcount = 0; - break; } addReply(slave,selectcmd); + decrRefCount(selectcmd); slave->slaveseldb = dictid; } - for (j = 0; j < outc; j++) addReply(slave,outv[j]); + addReplyMultiBulkLen(slave,argc); + for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } - for (j = 0; j < outc; j++) decrRefCount(outv[j]); - if (outv != static_outv) zfree(outv); } -void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) { +void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) { listNode *ln; listIter li; - int j; + int j, port; sds cmdrepr = sdsnew("+"); robj *cmdobj; + char ip[32]; struct timeval tv; gettimeofday(&tv,NULL); cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); - if (dictid != 0) cmdrepr = sdscatprintf(cmdrepr,"(db %d) ", dictid); + if (c->flags & REDIS_LUA_CLIENT) { + cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); + } else if (c->flags & REDIS_UNIX_SOCKET) { + cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); + } else { + anetPeerToString(c->fd,ip,&port); + cmdrepr = sdscatprintf(cmdrepr,"[%d %s:%d] ",dictid,ip,port); + } for (j = 0; j < argc; j++) { if (argv[j]->encoding == REDIS_ENCODING_INT) { @@ -118,7 +120,7 @@ void syncCommand(redisClient *c) { /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ - if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) { + if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) { addReplyError(c,"Can't SYNC while not connected with my master"); return; } @@ -135,7 +137,7 @@ void syncCommand(redisClient *c) { redisLog(REDIS_NOTICE,"Slave ask for synchronization"); /* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ - if (server.bgsavechildpid != -1) { + if (server.rdb_child_pid != -1) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save */ @@ -151,8 +153,7 @@ void syncCommand(redisClient *c) { if (ln) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. */ - listRelease(c->reply); - c->reply = listDup(slave->reply); + copyClientOutputBuffer(c,slave); c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { @@ -164,7 +165,7 @@ void syncCommand(redisClient *c) { } else { /* Ok we don't have a BGSAVE in progress, let's start one */ redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); - if (rdbSaveBackground(server.dbfilename) != REDIS_OK) { + if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); addReplyError(c,"Unable to perform background save"); return; @@ -178,6 +179,46 @@ void syncCommand(redisClient *c) { return; } +/* REPLCONF