]> git.saurik.com Git - redis.git/blobdiff - src/redis.c
Better MONITOR output, now includes client ip:port or the lua string if the command...
[redis.git] / src / redis.c
index 7b023dc576e6c812c5ec21c5f39d8a13ccbbd097..6d5522e824c93b0ada93e823f7a3d80a174ff943 100644 (file)
@@ -242,7 +242,8 @@ struct redisCommand redisCommandTable[] = {
     {"eval",evalCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
     {"evalsha",evalShaCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
     {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
-    {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0}
+    {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
+    {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}
 };
 
 /*============================ Utility functions ============================ */
@@ -852,6 +853,8 @@ void createSharedObjects(void) {
     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
     shared.del = createStringObject("DEL",3);
+    shared.rpop = createStringObject("RPOP",4);
+    shared.lpop = createStringObject("LPOP",4);
     for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
         shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
         shared.integers[j]->encoding = REDIS_ENCODING_INT;
@@ -962,6 +965,7 @@ void initServerConfig() {
     populateCommandTable();
     server.delCommand = lookupCommandByCString("del");
     server.multiCommand = lookupCommandByCString("multi");
+    server.lpushCommand = lookupCommandByCString("lpush");
     
     /* Slow log */
     server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@ -1160,6 +1164,43 @@ void resetCommandTableStats(void) {
     }
 }
 
+/* ========================== Redis OP Array API ============================ */
+
+void redisOpArrayInit(redisOpArray *oa) {
+    oa->ops = NULL;
+    oa->numops = 0;
+}
+
+int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
+                       robj **argv, int argc, int target)
+{
+    redisOp *op;
+
+    oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
+    op = oa->ops+oa->numops;
+    op->cmd = cmd;
+    op->dbid = dbid;
+    op->argv = argv;
+    op->argc = argc;
+    op->target = target;
+    oa->numops++;
+    return oa->numops;
+}
+
+void redisOpArrayFree(redisOpArray *oa) {
+    while(oa->numops) {
+        int j;
+        redisOp *op;
+
+        oa->numops--;
+        op = oa->ops+oa->numops;
+        for (j = 0; j < op->argc; j++)
+            decrRefCount(op->argv[j]);
+        zfree(op->argv);
+    }
+    zfree(oa->ops);
+}
+
 /* ====================== Commands lookup and execution ===================== */
 
 struct redisCommand *lookupCommand(sds name) {
@@ -1175,10 +1216,42 @@ struct redisCommand *lookupCommandByCString(char *s) {
     return cmd;
 }
 
+/* Propagate the specified command (in the context of the specified database id)
+ * to AOF, Slaves and Monitors.
+ *
+ * flags are an xor between:
+ * + REDIS_PROPAGATE_NONE (no propagation of command at all)
+ * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
+ * + REDIS_PROPAGATE_REPL (propagate into the replication link)
+ */
+void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+               int flags)
+{
+    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
+        feedAppendOnlyFile(cmd,dbid,argv,argc);
+    if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
+        replicationFeedSlaves(server.slaves,dbid,argv,argc);
+}
+
+/* Used inside commands to schedule the propagation of additional commands
+ * after the current command is propagated to AOF / Replication. */
+void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+                   int target)
+{
+    redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target);
+}
+
 /* Call() is the core of Redis execution of a command */
 void call(redisClient *c, int flags) {
     long long dirty, start = ustime(), duration;
 
+    /* Sent the command to clients in MONITOR mode, only if the commands are
+     * not geneated from reading an AOF. */
+    if (listLength(server.monitors) && !server.loading)
+        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
+
+    /* Call the command. */
+    redisOpArrayInit(&server.also_propagate);
     dirty = server.dirty;
     c->cmd->proc(c);
     dirty = server.dirty-dirty;
@@ -1189,20 +1262,37 @@ void call(redisClient *c, int flags) {
     if (server.loading && c->flags & REDIS_LUA_CLIENT)
         flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);
 
+    /* Log the command into the Slow log if needed, and populate the
+     * per-command statistics that we show in INFO commandstats. */
     if (flags & REDIS_CALL_SLOWLOG)
         slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
     if (flags & REDIS_CALL_STATS) {
         c->cmd->microseconds += duration;
         c->cmd->calls++;
     }
+
+    /* Propagate the command into the AOF and replication link */
     if (flags & REDIS_CALL_PROPAGATE) {
-        if (server.aof_state != REDIS_AOF_OFF && dirty > 0)
-            feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
-        if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
-            listLength(server.slaves))
-            replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
-        if (listLength(server.monitors))
-            replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
+        int flags = REDIS_PROPAGATE_NONE;
+
+        if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)
+            flags |= REDIS_PROPAGATE_REPL;
+        if (dirty)
+            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
+        if (flags != REDIS_PROPAGATE_NONE)
+            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
+    }
+    /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional
+     * PUSH command. */
+    if (server.also_propagate.numops) {
+        int j;
+        redisOp *rop;
+
+        for (j = 0; j < server.also_propagate.numops; j++) {
+            rop = &server.also_propagate.ops[j];
+            propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);
+        }
+        redisOpArrayFree(&server.also_propagate);
     }
     server.stat_numcommands++;
 }
@@ -1416,6 +1506,17 @@ void echoCommand(redisClient *c) {
     addReplyBulk(c,c->argv[1]);
 }
 
+void timeCommand(redisClient *c) {
+    struct timeval tv;
+
+    /* gettimeofday() can only fail if &tv is a bad addresss so we
+     * don't check for errors. */
+    gettimeofday(&tv,NULL);
+    addReplyMultiBulkLen(c,2);
+    addReplyBulkLongLong(c,tv.tv_sec);
+    addReplyBulkLongLong(c,tv.tv_usec);
+}
+
 /* Convert an amount of bytes into a human readable string in the form
  * of 100B, 2G, 100M, 4K, and so forth. */
 void bytesToHuman(char *s, unsigned long long n) {
@@ -1802,8 +1903,8 @@ int freeMemoryIfNeeded(void) {
     size_t mem_used, mem_tofree, mem_freed;
     int slaves = listLength(server.slaves);
 
-    /* Remove the size of slaves output buffers from the count of used
-     * memory. */
+    /* Remove the size of slaves output buffers and AOF buffer from the
+     * count of used memory. */
     mem_used = zmalloc_used_memory();
     if (slaves) {
         listIter li;
@@ -1819,6 +1920,10 @@ int freeMemoryIfNeeded(void) {
                 mem_used -= obuf_bytes;
         }
     }
+    if (server.aof_state != REDIS_AOF_OFF) {
+        mem_used -= sdslen(server.aof_buf);
+        mem_used -= sdslen(server.aof_rewrite_buf);
+    }
 
     /* Check if we are over the memory limit. */
     if (mem_used <= server.maxmemory) return REDIS_OK;
@@ -1828,7 +1933,6 @@ int freeMemoryIfNeeded(void) {
 
     /* Compute how much memory we need to free. */
     mem_tofree = mem_used - server.maxmemory;
-    printf("USED: %zu, TOFREE: %zu\n", mem_used, mem_tofree);
     mem_freed = 0;
     while (mem_freed < mem_tofree) {
         int j, k, keys_freed = 0;
@@ -1919,7 +2023,6 @@ int freeMemoryIfNeeded(void) {
                 delta = (long long) zmalloc_used_memory();
                 dbDelete(db,keyobj);
                 delta -= (long long) zmalloc_used_memory();
-                // printf("%lld\n",delta);
                 mem_freed += delta;
                 server.stat_evictedkeys++;
                 decrRefCount(keyobj);
@@ -1929,27 +2032,7 @@ int freeMemoryIfNeeded(void) {
                  * start spending so much time here that is impossible to
                  * deliver data to the slaves fast enough, so we force the
                  * transmission here inside the loop. */
-                if (slaves) {
-                    listIter li;
-                    listNode *ln;
-
-                    listRewind(server.slaves,&li);
-                    while((ln = listNext(&li))) {
-                        redisClient *slave = listNodeValue(ln);
-                        int events;
-
-                        events = aeGetFileEvents(server.el,slave->fd);
-                        printf("EVENTS: %d\n", events);
-                        if (events & AE_WRITABLE &&
-                            slave->replstate == REDIS_REPL_ONLINE &&
-                            listLength(slave->reply))
-                        {
-                            printf("SLAVE %d -> %d\n",
-                                slave->fd, (int) listLength(slave->reply));
-                            sendReplyToClient(server.el,slave->fd,slave,0);
-                        }
-                    }
-                }
+                if (slaves) flushSlavesOutputBuffers();
             }
         }
         if (!keys_freed) return REDIS_ERR; /* nothing to free... */