]> git.saurik.com Git - redis.git/commitdiff
Merge pull request #576 from saj/fix-slave-ping-period
authorSalvatore Sanfilippo <antirez@gmail.com>
Wed, 5 Sep 2012 13:59:37 +0000 (06:59 -0700)
committerSalvatore Sanfilippo <antirez@gmail.com>
Wed, 5 Sep 2012 13:59:37 +0000 (06:59 -0700)
Bug fix: slaves being pinged every second

1  2 
src/redis.h
src/replication.c

diff --combined src/redis.h
index 5e4ee844c9879e2948bf79d46680a193ac8bd5a0,e79855d8699165b37a9b8266b382b7523e176a92..ec2bde2219a7ada61e1fe2d832b03a0136d17fe2
  #define REDIS_SLOWLOG_MAX_LEN 128
  #define REDIS_MAX_CLIENTS 10000
  #define REDIS_AUTHPASS_MAX_LEN 512
 -
 +#define REDIS_DEFAULT_SLAVE_PRIORITY 100
  #define REDIS_REPL_TIMEOUT 60
  #define REDIS_REPL_PING_SLAVE_PERIOD 10
 -
  #define REDIS_RUN_ID_SIZE 40
  #define REDIS_OPS_SEC_SAMPLES 16
  
@@@ -84,8 -85,6 +84,8 @@@
  #define REDIS_CMD_NOSCRIPT  64              /* "s" flag */
  #define REDIS_CMD_RANDOM 128                /* "R" flag */
  #define REDIS_CMD_SORT_FOR_SCRIPT 256       /* "S" flag */
 +#define REDIS_CMD_LOADING 512               /* "l" flag */
 +#define REDIS_CMD_STALE 1024                /* "t" flag */
  
  /* Object types */
  #define REDIS_STRING 0
  #define REDIS_REPL_NONE 0 /* No active replication */
  #define REDIS_REPL_CONNECT 1 /* Must connect to master */
  #define REDIS_REPL_CONNECTING 2 /* Connecting to master */
 -#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */
 -#define REDIS_REPL_CONNECTED 4 /* Connected to master */
 +#define REDIS_REPL_RECEIVE_PONG 3 /* Wait for PING reply */
 +#define REDIS_REPL_TRANSFER 4 /* Receiving .rdb from master */
 +#define REDIS_REPL_CONNECTED 5 /* Connected to master */
  
  /* Synchronous read timeout - slave side */
  #define REDIS_REPL_SYNCIO_TIMEOUT 5
  #define REDIS_PROPAGATE_AOF 1
  #define REDIS_PROPAGATE_REPL 2
  
 +/* Using the following macro you can run code inside serverCron() with the
 + * specified period, specified in milliseconds.
 + * The actual resolution depends on REDIS_HZ. */
 +#define run_with_period(_ms_) if (!(server.cronloops%((_ms_)/(1000/REDIS_HZ))))
 +
  /* We can print the stacktrace, so our assert is defined this way: */
  #define redisAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_redisAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1)))
  #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
@@@ -584,7 -577,6 +584,7 @@@ struct redisServer 
      int arch_bits;              /* 32 or 64 depending on sizeof(long) */
      int cronloops;              /* Number of times the cron function run */
      char runid[REDIS_RUN_ID_SIZE+1];  /* ID always different at every exec. */
 +    int sentinel_mode;          /* True if this instance is a Sentinel. */
      /* Networking */
      int port;                   /* TCP listening port */
      char *bindaddr;             /* Bind address or NULL */
      time_t aof_last_fsync;            /* UNIX time of last fsync() */
      time_t aof_rewrite_time_last;   /* Time used by last AOF rewrite run. */
      time_t aof_rewrite_time_start;  /* Current AOF rewrite start time. */
 +    int aof_lastbgrewrite_status;   /* REDIS_OK or REDIS_ERR */
      unsigned long aof_delayed_fsync;  /* delayed AOF fsync() counter */
      /* RDB persistence */
      long long dirty;                /* Changes to DB from the last save */
      char *masterauth;               /* AUTH with this password with master */
      char *masterhost;               /* Hostname of master */
      int masterport;                 /* Port of master */
-     int repl_ping_slave_period;     /* Master pings the salve every N seconds */
+     int repl_ping_slave_period;     /* Master pings the slave every N seconds */
      int repl_timeout;               /* Timeout after N seconds of master idle */
      redisClient *master;     /* Client that is master for this slave */
      int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
      int repl_state;          /* Replication status if the instance is a slave */
 -    off_t repl_transfer_left;  /* Bytes left reading .rdb  */
 +    off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
 +    off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
 +    off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
      int repl_transfer_s;     /* Slave -> Master SYNC socket */
      int repl_transfer_fd;    /* Slave -> Master SYNC temp file descriptor */
      char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
      int repl_serve_stale_data; /* Serve stale data when link is down? */
      int repl_slave_ro;          /* Slave is read only? */
      time_t repl_down_since; /* Unix time at which link with master went down */
 +    int slave_priority;             /* Reported in INFO and used by Sentinel. */
      /* Limits */
      unsigned int maxclients;        /* Max number of simultaneous clients */
      unsigned long long maxmemory;   /* Max number of memory bytes to use */
      list *unblocked_clients; /* list of clients to unblock before next loop */
      /* Sort parameters - qsort_r() is only available under BSD so we
       * have to take this state global, in order to pass it to sortCompare() */
 -    int sort_dontsort;
      int sort_desc;
      int sort_alpha;
      int sort_bypattern;
@@@ -1124,12 -1113,6 +1124,12 @@@ void clusterCron(void)
  clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
  void clusterPropagatePublish(robj *channel, robj *message);
  
 +/* Sentinel */
 +void initSentinelConfig(void);
 +void initSentinel(void);
 +void sentinelTimer(void);
 +char *sentinelHandleConfiguration(char **argv, int argc);
 +
  /* Scripting */
  void scriptingInit(void);
  
@@@ -1295,10 -1278,4 +1295,10 @@@ void enableWatchdog(int period)
  void disableWatchdog(void);
  void watchdogScheduleSignal(int period);
  void redisLogHexDump(int level, char *descr, void *value, size_t len);
 +
 +#define redisDebug(fmt, ...) \
 +    printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__)
 +#define redisDebugMark() \
 +    printf("-- MARK %s:%d --\n", __FILE__, __LINE__)
 +
  #endif
diff --combined src/replication.c
index 72b88977afeb7aa5f7b161fc15a7445726aea2f3,3f7c2914dcecaeb2b52be4659a8fd6655eec0821..b43e9927f5886cc1c77ede523d38a68ba7fda599
@@@ -311,18 -311,16 +311,18 @@@ void replicationAbortSyncTransfer(void
  }
  
  /* Asynchronously read the SYNC payload we receive from a master */
 +#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
  void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
      char buf[4096];
      ssize_t nread, readlen;
 +    off_t left;
      REDIS_NOTUSED(el);
      REDIS_NOTUSED(privdata);
      REDIS_NOTUSED(mask);
  
 -    /* If repl_transfer_left == -1 we still have to read the bulk length
 +    /* If repl_transfer_size == -1 we still have to read the bulk length
       * from the master reply. */
 -    if (server.repl_transfer_left == -1) {
 +    if (server.repl_transfer_size == -1) {
          if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
              redisLog(REDIS_WARNING,
                  "I/O error reading bulk count from MASTER: %s",
              redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
              goto error;
          }
 -        server.repl_transfer_left = strtol(buf+1,NULL,10);
 +        server.repl_transfer_size = strtol(buf+1,NULL,10);
          redisLog(REDIS_NOTICE,
              "MASTER <-> SLAVE sync: receiving %ld bytes from master",
 -            server.repl_transfer_left);
 +            server.repl_transfer_size);
          return;
      }
  
      /* Read bulk data */
 -    readlen = (server.repl_transfer_left < (signed)sizeof(buf)) ?
 -        server.repl_transfer_left : (signed)sizeof(buf);
 +    left = server.repl_transfer_size - server.repl_transfer_read;
 +    readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
      nread = read(fd,buf,readlen);
      if (nread <= 0) {
          redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
          redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
          goto error;
      }
 -    server.repl_transfer_left -= nread;
 +    server.repl_transfer_read += nread;
 +
 +    /* Sync data on disk from time to time, otherwise at the end of the transfer
 +     * we may suffer a big delay as the memory buffers are copied into the
 +     * actual disk. */
 +    if (server.repl_transfer_read >=
 +        server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
 +    {
 +        off_t sync_size = server.repl_transfer_read -
 +                          server.repl_transfer_last_fsync_off;
 +        rdb_fsync_range(server.repl_transfer_fd,
 +            server.repl_transfer_last_fsync_off, sync_size);
 +        server.repl_transfer_last_fsync_off += sync_size;
 +    }
 +
      /* Check if the transfer is now complete */
 -    if (server.repl_transfer_left == 0) {
 +    if (server.repl_transfer_read == server.repl_transfer_size) {
          if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
              redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
              replicationAbortSyncTransfer();
@@@ -483,8 -467,6 +483,8 @@@ char *sendSynchronousCommand(int fd, ..
  void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
      char tmpfile[256], *err;
      int dfd, maxtries = 5;
 +    int sockerr = 0;
 +    socklen_t errlen = sizeof(sockerr);
      REDIS_NOTUSED(el);
      REDIS_NOTUSED(privdata);
      REDIS_NOTUSED(mask);
          return;
      }
  
 -    redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
 -    /* This event should only be triggered once since it is used to have a
 -     * non-blocking connect(2) to the master. It has been triggered when this
 -     * function is called, so we can delete it. */
 -    aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
 +    /* Check for errors in the socket. */
 +    if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
 +        sockerr = errno;
 +    if (sockerr) {
 +        aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
 +        redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
 +            strerror(sockerr));
 +        goto error;
 +    }
 +
 +    /* If we were connecting, it's time to send a non blocking PING, we want to
 +     * make sure the master is able to reply before going into the actual
 +     * replication process where we have long timeouts in the order of
 +     * seconds (in the meantime the slave would block). */
 +    if (server.repl_state == REDIS_REPL_CONNECTING) {
 +        redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
 +        /* Delete the writable event so that the readable event remains
 +         * registered and we can wait for the PONG reply. */
 +        aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
 +        server.repl_state = REDIS_REPL_RECEIVE_PONG;
 +        /* Send the PING, don't check for errors at all, we have the timeout
 +         * that will take care about this. */
 +        syncWrite(fd,"PING\r\n",6,100);
 +        return;
 +    }
 +
 +    /* Receive the PONG command. */
 +    if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
 +        char buf[1024];
 +
 +        /* Delete the readable event, we no longer need it now that there is
 +         * the PING reply to read. */
 +        aeDeleteFileEvent(server.el,fd,AE_READABLE);
 +
 +        /* Read the reply with explicit timeout. */
 +        buf[0] = '\0';
 +        if (syncReadLine(fd,buf,sizeof(buf),
 +            server.repl_syncio_timeout*1000) == -1)
 +        {
 +            redisLog(REDIS_WARNING,
 +                "I/O error reading PING reply from master: %s",
 +                strerror(errno));
 +            goto error;
 +        }
 +
 +        /* We don't care about the reply, it can be +PONG or an error since
 +         * the server requires AUTH. As long as it replies correctly, it's
 +         * fine from our point of view. */
 +        if (buf[0] != '-' && buf[0] != '+') {
 +            redisLog(REDIS_WARNING,"Unexpected reply to PING from master.");
 +            goto error;
 +        } else {
 +            redisLog(REDIS_NOTICE,
 +                "Master replied to PING, replication can continue...");
 +        }
 +    }
  
      /* AUTH with the master if required. */
      if(server.masterauth) {
      }
  
      server.repl_state = REDIS_REPL_TRANSFER;
 -    server.repl_transfer_left = -1;
 +    server.repl_transfer_size = -1;
 +    server.repl_transfer_read = 0;
 +    server.repl_transfer_last_fsync_off = 0;
      server.repl_transfer_fd = dfd;
      server.repl_transfer_lastio = server.unixtime;
      server.repl_transfer_tmpfile = zstrdup(tmpfile);
      return;
  
  error:
 -    server.repl_state = REDIS_REPL_CONNECT;
      close(fd);
 +    server.repl_transfer_s = -1;
 +    server.repl_state = REDIS_REPL_CONNECT;
      return;
  }
  
@@@ -651,8 -579,7 +651,8 @@@ int connectWithMaster(void) 
  void undoConnectWithMaster(void) {
      int fd = server.repl_transfer_s;
  
 -    redisAssert(server.repl_state == REDIS_REPL_CONNECTING);
 +    redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
 +                server.repl_state == REDIS_REPL_RECEIVE_PONG);
      aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
      close(fd);
      server.repl_transfer_s = -1;
@@@ -668,8 -595,7 +668,8 @@@ void slaveofCommand(redisClient *c) 
              if (server.master) freeClient(server.master);
              if (server.repl_state == REDIS_REPL_TRANSFER)
                  replicationAbortSyncTransfer();
 -            else if (server.repl_state == REDIS_REPL_CONNECTING)
 +            else if (server.repl_state == REDIS_REPL_CONNECTING ||
 +                     server.repl_state == REDIS_REPL_RECEIVE_PONG)
                  undoConnectWithMaster();
              server.repl_state = REDIS_REPL_NONE;
              redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
  
  void replicationCron(void) {
      /* Non blocking connection timeout? */
 -    if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTING &&
 +    if (server.masterhost &&
 +        (server.repl_state == REDIS_REPL_CONNECTING ||
 +         server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
          (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
      {
          redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
       * So slaves can implement an explicit timeout to masters, and will
       * be able to detect a link disconnection even if the TCP connection
       * will not actually go down. */
-     if (!(server.cronloops % (server.repl_ping_slave_period*10))) {
+     if (!(server.cronloops % (server.repl_ping_slave_period * REDIS_HZ))) {
          listIter li;
          listNode *ln;