X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/b91d605a35c294573f0213c89c421d09b538c2b6..d10a01bb6d8645f66b66e1fb3d1c52a0dd7bd8fe:/src/replication.c diff --git a/src/replication.c b/src/replication.c index 363ce54a..c1e46191 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3,43 +3,16 @@ #include #include #include +#include #include +/* ---------------------------------- MASTER -------------------------------- */ + void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listNode *ln; listIter li; - int outc = 0, j; - robj **outv; - /* We need 1+(ARGS*3) objects since commands are using the new protocol - * and we one 1 object for the first "*\r\n" multibulk count, then - * for every additional object we have "$\r\n" + object + "\r\n". */ - robj *static_outv[REDIS_STATIC_ARGS*3+1]; - robj *lenobj; - - if (argc <= REDIS_STATIC_ARGS) { - outv = static_outv; - } else { - outv = zmalloc(sizeof(robj*)*(argc*3+1)); - } + int j; - lenobj = createObject(REDIS_STRING, - sdscatprintf(sdsempty(), "*%d\r\n", argc)); - lenobj->refcount = 0; - outv[outc++] = lenobj; - for (j = 0; j < argc; j++) { - lenobj = createObject(REDIS_STRING, - sdscatprintf(sdsempty(),"$%lu\r\n", - (unsigned long) stringObjectLen(argv[j]))); - lenobj->refcount = 0; - outv[outc++] = lenobj; - outv[outc++] = argv[j]; - outv[outc++] = shared.crlf; - } - - /* Increment all the refcounts at start and decrement at end in order to - * be sure to free objects if there is no slave in a replication state - * able to be feed with commands */ - for (j = 0; j < outc; j++) incrRefCount(outv[j]); listRewind(slaves,&li); while((ln = listNext(&li))) { redisClient *slave = ln->value; @@ -47,47 +20,45 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue; - /* Feed all the other slaves, MONITORs and so on */ + /* Feed slaves that are waiting for the initial SYNC (so these commands + * are queued in the output buffer until the intial SYNC completes), + * or are already in sync with the master. */ if (slave->slaveseldb != dictid) { robj *selectcmd; - switch(dictid) { - case 0: selectcmd = shared.select0; break; - case 1: selectcmd = shared.select1; break; - case 2: selectcmd = shared.select2; break; - case 3: selectcmd = shared.select3; break; - case 4: selectcmd = shared.select4; break; - case 5: selectcmd = shared.select5; break; - case 6: selectcmd = shared.select6; break; - case 7: selectcmd = shared.select7; break; - case 8: selectcmd = shared.select8; break; - case 9: selectcmd = shared.select9; break; - default: + if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) { + selectcmd = shared.select[dictid]; + incrRefCount(selectcmd); + } else { selectcmd = createObject(REDIS_STRING, sdscatprintf(sdsempty(),"select %d\r\n",dictid)); - selectcmd->refcount = 0; - break; } addReply(slave,selectcmd); + decrRefCount(selectcmd); slave->slaveseldb = dictid; } - for (j = 0; j < outc; j++) addReply(slave,outv[j]); + addReplyMultiBulkLen(slave,argc); + for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]); } - for (j = 0; j < outc; j++) decrRefCount(outv[j]); - if (outv != static_outv) zfree(outv); } -void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) { +void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) { listNode *ln; listIter li; - int j; + int j, port; sds cmdrepr = sdsnew("+"); robj *cmdobj; + char ip[32]; struct timeval tv; gettimeofday(&tv,NULL); - cmdrepr = sdscatprintf(cmdrepr,"%ld.%ld ",(long)tv.tv_sec,(long)tv.tv_usec); - if (dictid != 0) cmdrepr = sdscatprintf(cmdrepr,"(db %d) ", dictid); + cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); + if (c->flags & REDIS_LUA_CLIENT) { + cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ", dictid); + } else { + anetPeerToString(c->fd,ip,&port); + cmdrepr = sdscatprintf(cmdrepr,"[%d %s:%d] ", dictid,ip,port); + } for (j = 0; j < argc; j++) { if (argv[j]->encoding == REDIS_ENCODING_INT) { @@ -110,76 +81,14 @@ void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) decrRefCount(cmdobj); } -int syncWrite(int fd, char *ptr, ssize_t size, int timeout) { - ssize_t nwritten, ret = size; - time_t start = time(NULL); - - timeout++; - while(size) { - if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) { - nwritten = write(fd,ptr,size); - if (nwritten == -1) return -1; - ptr += nwritten; - size -= nwritten; - } - if ((time(NULL)-start) > timeout) { - errno = ETIMEDOUT; - return -1; - } - } - return ret; -} - -int syncRead(int fd, char *ptr, ssize_t size, int timeout) { - ssize_t nread, totread = 0; - time_t start = time(NULL); - - timeout++; - while(size) { - if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) { - nread = read(fd,ptr,size); - if (nread == -1) return -1; - ptr += nread; - size -= nread; - totread += nread; - } - if ((time(NULL)-start) > timeout) { - errno = ETIMEDOUT; - return -1; - } - } - return totread; -} - -int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) { - ssize_t nread = 0; - - size--; - while(size) { - char c; - - if (syncRead(fd,&c,1,timeout) == -1) return -1; - if (c == '\n') { - *ptr = '\0'; - if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0'; - return nread; - } else { - *ptr++ = c; - *ptr = '\0'; - nread++; - } - } - return nread; -} - void syncCommand(redisClient *c) { /* ignore SYNC if aleady slave or in monitor mode */ if (c->flags & REDIS_SLAVE) return; /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ - if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) { - addReplySds(c,sdsnew("-ERR Can't SYNC while not connected with my master\r\n")); + if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) { + addReplyError(c,"Can't SYNC while not connected with my master"); return; } @@ -188,14 +97,14 @@ void syncCommand(redisClient *c) { * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ if (listLength(c->reply) != 0) { - addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n")); + addReplyError(c,"SYNC is invalid with pending input"); return; } redisLog(REDIS_NOTICE,"Slave ask for synchronization"); /* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ - if (server.bgsavechildpid != -1) { + if (server.rdb_child_pid != -1) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save */ @@ -211,8 +120,7 @@ void syncCommand(redisClient *c) { if (ln) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. */ - listRelease(c->reply); - c->reply = listDup(slave->reply); + copyClientOutputBuffer(c,slave); c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { @@ -224,9 +132,9 @@ void syncCommand(redisClient *c) { } else { /* Ok we don't have a BGSAVE in progress, let's start one */ redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); - if (rdbSaveBackground(server.dbfilename) != REDIS_OK) { + if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); - addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n")); + addReplyError(c,"Unable to perform background save"); return; } c->replstate = REDIS_REPL_WAIT_BGSAVE_END; @@ -238,6 +146,46 @@ void syncCommand(redisClient *c) { return; } +/* REPLCONF