]> git.saurik.com Git - redis.git/blobdiff - src/redis.c
Refuse writes if can't persist on disk.
[redis.git] / src / redis.c
index 647c586736e3961796ddc4cc3fdb51c6672764ac..3dfef024f1083991a405f2312c7c5621a7abcb4f 100644 (file)
@@ -242,7 +242,8 @@ struct redisCommand redisCommandTable[] = {
     {"eval",evalCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
     {"evalsha",evalShaCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
     {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
-    {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0}
+    {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
+    {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}
 };
 
 /*============================ Utility functions ============================ */
@@ -832,6 +833,8 @@ void createSharedObjects(void) {
         "-LOADING Redis is loading the dataset in memory\r\n"));
     shared.slowscripterr = createObject(REDIS_STRING,sdsnew(
         "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
+    shared.bgsaveerr = createObject(REDIS_STRING,sdsnew(
+        "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Write commands are disabled. Please check Redis logs for details about the error.\r\n"));
     shared.space = createObject(REDIS_STRING,sdsnew(" "));
     shared.colon = createObject(REDIS_STRING,sdsnew(":"));
     shared.plus = createObject(REDIS_STRING,sdsnew("+"));
@@ -852,6 +855,8 @@ void createSharedObjects(void) {
     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
     shared.del = createStringObject("DEL",3);
+    shared.rpop = createStringObject("RPOP",4);
+    shared.lpop = createStringObject("LPOP",4);
     for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
         shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
         shared.integers[j]->encoding = REDIS_ENCODING_INT;
@@ -962,6 +967,7 @@ void initServerConfig() {
     populateCommandTable();
     server.delCommand = lookupCommandByCString("del");
     server.multiCommand = lookupCommandByCString("multi");
+    server.lpushCommand = lookupCommandByCString("lpush");
     
     /* Slow log */
     server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@ -1084,6 +1090,7 @@ void initServer() {
     server.stat_fork_time = 0;
     server.stat_rejected_conn = 0;
     server.unixtime = time(NULL);
+    server.lastbgsave_status = REDIS_OK;
     aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
     if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
         acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
@@ -1160,6 +1167,43 @@ void resetCommandTableStats(void) {
     }
 }
 
+/* ========================== Redis OP Array API ============================ */
+
+void redisOpArrayInit(redisOpArray *oa) {
+    oa->ops = NULL;
+    oa->numops = 0;
+}
+
+int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
+                       robj **argv, int argc, int target)
+{
+    redisOp *op;
+
+    oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
+    op = oa->ops+oa->numops;
+    op->cmd = cmd;
+    op->dbid = dbid;
+    op->argv = argv;
+    op->argc = argc;
+    op->target = target;
+    oa->numops++;
+    return oa->numops;
+}
+
+void redisOpArrayFree(redisOpArray *oa) {
+    while(oa->numops) {
+        int j;
+        redisOp *op;
+
+        oa->numops--;
+        op = oa->ops+oa->numops;
+        for (j = 0; j < op->argc; j++)
+            decrRefCount(op->argv[j]);
+        zfree(op->argv);
+    }
+    zfree(oa->ops);
+}
+
 /* ====================== Commands lookup and execution ===================== */
 
 struct redisCommand *lookupCommand(sds name) {
@@ -1192,6 +1236,14 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
         replicationFeedSlaves(server.slaves,dbid,argv,argc);
 }
 
+/* Used inside commands to schedule the propagation of additional commands
+ * after the current command is propagated to AOF / Replication. */
+void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+                   int target)
+{
+    redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target);
+}
+
 /* Call() is the core of Redis execution of a command */
 void call(redisClient *c, int flags) {
     long long dirty, start = ustime(), duration;
@@ -1199,9 +1251,10 @@ void call(redisClient *c, int flags) {
     /* Sent the command to clients in MONITOR mode, only if the commands are
      * not geneated from reading an AOF. */
     if (listLength(server.monitors) && !server.loading)
-        replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
+        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
 
     /* Call the command. */
+    redisOpArrayInit(&server.also_propagate);
     dirty = server.dirty;
     c->cmd->proc(c);
     dirty = server.dirty-dirty;
@@ -1232,6 +1285,18 @@ void call(redisClient *c, int flags) {
         if (flags != REDIS_PROPAGATE_NONE)
             propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
     }
+    /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional
+     * PUSH command. */
+    if (server.also_propagate.numops) {
+        int j;
+        redisOp *rop;
+
+        for (j = 0; j < server.also_propagate.numops; j++) {
+            rop = &server.also_propagate.ops[j];
+            propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);
+        }
+        redisOpArrayFree(&server.also_propagate);
+    }
     server.stat_numcommands++;
 }
 
@@ -1312,6 +1377,14 @@ int processCommand(redisClient *c) {
         }
     }
 
+    /* Don't accept write commands if there are problems persisting on disk. */
+    if (server.saveparamslen > 0 && server.lastbgsave_status == REDIS_ERR &&
+        c->cmd->flags & REDIS_CMD_WRITE)
+    {
+        addReply(c, shared.bgsaveerr);
+        return REDIS_OK;
+    }
+
     /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
     if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
         &&
@@ -1444,6 +1517,17 @@ void echoCommand(redisClient *c) {
     addReplyBulk(c,c->argv[1]);
 }
 
+void timeCommand(redisClient *c) {
+    struct timeval tv;
+
+    /* gettimeofday() can only fail if &tv is a bad addresss so we
+     * don't check for errors. */
+    gettimeofday(&tv,NULL);
+    addReplyMultiBulkLen(c,2);
+    addReplyBulkLongLong(c,tv.tv_sec);
+    addReplyBulkLongLong(c,tv.tv_usec);
+}
+
 /* Convert an amount of bytes into a human readable string in the form
  * of 100B, 2G, 100M, 4K, and so forth. */
 void bytesToHuman(char *s, unsigned long long n) {
@@ -1572,12 +1656,14 @@ sds genRedisInfoString(char *section) {
             "changes_since_last_save:%lld\r\n"
             "bgsave_in_progress:%d\r\n"
             "last_save_time:%ld\r\n"
+            "last_bgsave_status:%s\r\n"
             "bgrewriteaof_in_progress:%d\r\n",
             server.loading,
             server.aof_state != REDIS_AOF_OFF,
             server.dirty,
             server.rdb_child_pid != -1,
             server.lastsave,
+            server.lastbgsave_status == REDIS_OK ? "ok" : "err",
             server.aof_child_pid != -1);
 
         if (server.aof_state != REDIS_AOF_OFF) {