]> git.saurik.com Git - redis.git/blobdiff - src/redis.c
Better MONITOR output, now includes client ip:port or the lua string if the command...
[redis.git] / src / redis.c
index 45ee07caf64cd348b727802d87330502ee177b13..6d5522e824c93b0ada93e823f7a3d80a174ff943 100644 (file)
@@ -242,7 +242,8 @@ struct redisCommand redisCommandTable[] = {
     {"eval",evalCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
     {"evalsha",evalShaCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
     {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
-    {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0}
+    {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
+    {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}
 };
 
 /*============================ Utility functions ============================ */
@@ -852,6 +853,8 @@ void createSharedObjects(void) {
     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
     shared.del = createStringObject("DEL",3);
+    shared.rpop = createStringObject("RPOP",4);
+    shared.lpop = createStringObject("LPOP",4);
     for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
         shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
         shared.integers[j]->encoding = REDIS_ENCODING_INT;
@@ -962,6 +965,7 @@ void initServerConfig() {
     populateCommandTable();
     server.delCommand = lookupCommandByCString("del");
     server.multiCommand = lookupCommandByCString("multi");
+    server.lpushCommand = lookupCommandByCString("lpush");
     
     /* Slow log */
     server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@ -1160,6 +1164,43 @@ void resetCommandTableStats(void) {
     }
 }
 
+/* ========================== Redis OP Array API ============================ */
+
+void redisOpArrayInit(redisOpArray *oa) {
+    oa->ops = NULL;
+    oa->numops = 0;
+}
+
+int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
+                       robj **argv, int argc, int target)
+{
+    redisOp *op;
+
+    oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
+    op = oa->ops+oa->numops;
+    op->cmd = cmd;
+    op->dbid = dbid;
+    op->argv = argv;
+    op->argc = argc;
+    op->target = target;
+    oa->numops++;
+    return oa->numops;
+}
+
+void redisOpArrayFree(redisOpArray *oa) {
+    while(oa->numops) {
+        int j;
+        redisOp *op;
+
+        oa->numops--;
+        op = oa->ops+oa->numops;
+        for (j = 0; j < op->argc; j++)
+            decrRefCount(op->argv[j]);
+        zfree(op->argv);
+    }
+    zfree(oa->ops);
+}
+
 /* ====================== Commands lookup and execution ===================== */
 
 struct redisCommand *lookupCommand(sds name) {
@@ -1175,10 +1216,42 @@ struct redisCommand *lookupCommandByCString(char *s) {
     return cmd;
 }
 
+/* Propagate the specified command (in the context of the specified database id)
+ * to AOF, Slaves and Monitors.
+ *
+ * flags are an xor between:
+ * + REDIS_PROPAGATE_NONE (no propagation of command at all)
+ * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
+ * + REDIS_PROPAGATE_REPL (propagate into the replication link)
+ */
+void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+               int flags)
+{
+    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
+        feedAppendOnlyFile(cmd,dbid,argv,argc);
+    if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
+        replicationFeedSlaves(server.slaves,dbid,argv,argc);
+}
+
+/* Used inside commands to schedule the propagation of additional commands
+ * after the current command is propagated to AOF / Replication. */
+void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+                   int target)
+{
+    redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target);
+}
+
 /* Call() is the core of Redis execution of a command */
 void call(redisClient *c, int flags) {
     long long dirty, start = ustime(), duration;
 
+    /* Sent the command to clients in MONITOR mode, only if the commands are
+     * not geneated from reading an AOF. */
+    if (listLength(server.monitors) && !server.loading)
+        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
+
+    /* Call the command. */
+    redisOpArrayInit(&server.also_propagate);
     dirty = server.dirty;
     c->cmd->proc(c);
     dirty = server.dirty-dirty;
@@ -1189,20 +1262,37 @@ void call(redisClient *c, int flags) {
     if (server.loading && c->flags & REDIS_LUA_CLIENT)
         flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);
 
+    /* Log the command into the Slow log if needed, and populate the
+     * per-command statistics that we show in INFO commandstats. */
     if (flags & REDIS_CALL_SLOWLOG)
         slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
     if (flags & REDIS_CALL_STATS) {
         c->cmd->microseconds += duration;
         c->cmd->calls++;
     }
+
+    /* Propagate the command into the AOF and replication link */
     if (flags & REDIS_CALL_PROPAGATE) {
-        if (server.aof_state != REDIS_AOF_OFF && dirty > 0)
-            feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
-        if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
-            listLength(server.slaves))
-            replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
-        if (listLength(server.monitors))
-            replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
+        int flags = REDIS_PROPAGATE_NONE;
+
+        if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)
+            flags |= REDIS_PROPAGATE_REPL;
+        if (dirty)
+            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
+        if (flags != REDIS_PROPAGATE_NONE)
+            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
+    }
+    /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional
+     * PUSH command. */
+    if (server.also_propagate.numops) {
+        int j;
+        redisOp *rop;
+
+        for (j = 0; j < server.also_propagate.numops; j++) {
+            rop = &server.also_propagate.ops[j];
+            propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);
+        }
+        redisOpArrayFree(&server.also_propagate);
     }
     server.stat_numcommands++;
 }
@@ -1275,12 +1365,13 @@ int processCommand(redisClient *c) {
      * First we try to free some memory if possible (if there are volatile
      * keys in the dataset). If there are not the only thing we can do
      * is returning an error. */
-    if (server.maxmemory) freeMemoryIfNeeded();
-    if (server.maxmemory && (c->cmd->flags & REDIS_CMD_DENYOOM) &&
-        zmalloc_used_memory() > server.maxmemory)
-    {
-        addReplyError(c,"command not allowed when used memory > 'maxmemory'");
-        return REDIS_OK;
+    if (server.maxmemory) {
+        int retval = freeMemoryIfNeeded();
+        if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
+            addReplyError(c,
+                "command not allowed when used memory > 'maxmemory'");
+            return REDIS_OK;
+        }
     }
 
     /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
@@ -1415,6 +1506,17 @@ void echoCommand(redisClient *c) {
     addReplyBulk(c,c->argv[1]);
 }
 
+void timeCommand(redisClient *c) {
+    struct timeval tv;
+
+    /* gettimeofday() can only fail if &tv is a bad addresss so we
+     * don't check for errors. */
+    gettimeofday(&tv,NULL);
+    addReplyMultiBulkLen(c,2);
+    addReplyBulkLongLong(c,tv.tv_sec);
+    addReplyBulkLongLong(c,tv.tv_usec);
+}
+
 /* Convert an amount of bytes into a human readable string in the form
  * of 100B, 2G, 100M, 4K, and so forth. */
 void bytesToHuman(char *s, unsigned long long n) {
@@ -1783,23 +1885,57 @@ void monitorCommand(redisClient *c) {
 /* ============================ Maxmemory directive  ======================== */
 
 /* This function gets called when 'maxmemory' is set on the config file to limit
- * the max memory used by the server, and we are out of memory.
- * This function will try to, in order:
+ * the max memory used by the server, before processing a command.
  *
- * - Free objects from the free list
- * - Try to remove keys with an EXPIRE set
+ * The goal of the function is to free enough memory to keep Redis under the
+ * configured memory limit.
  *
- * It is not possible to free enough memory to reach used-memory < maxmemory
- * the server will start refusing commands that will enlarge even more the
- * memory usage.
+ * The function starts calculating how many bytes should be freed to keep
+ * Redis under the limit, and enters a loop selecting the best keys to
+ * evict accordingly to the configured policy.
+ *
+ * If all the bytes needed to return back under the limit were freed the
+ * function returns REDIS_OK, otherwise REDIS_ERR is returned, and the caller
+ * should block the execution of commands that will result in more memory
+ * used by the server.
  */
-void freeMemoryIfNeeded(void) {
-    /* Remove keys accordingly to the active policy as long as we are
-     * over the memory limit. */
-    if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION) return;
+int freeMemoryIfNeeded(void) {
+    size_t mem_used, mem_tofree, mem_freed;
+    int slaves = listLength(server.slaves);
+
+    /* Remove the size of slaves output buffers and AOF buffer from the
+     * count of used memory. */
+    mem_used = zmalloc_used_memory();
+    if (slaves) {
+        listIter li;
+        listNode *ln;
+
+        listRewind(server.slaves,&li);
+        while((ln = listNext(&li))) {
+            redisClient *slave = listNodeValue(ln);
+            unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
+            if (obuf_bytes > mem_used)
+                mem_used = 0;
+            else
+                mem_used -= obuf_bytes;
+        }
+    }
+    if (server.aof_state != REDIS_AOF_OFF) {
+        mem_used -= sdslen(server.aof_buf);
+        mem_used -= sdslen(server.aof_rewrite_buf);
+    }
 
-    while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
-        int j, k, freed = 0;
+    /* Check if we are over the memory limit. */
+    if (mem_used <= server.maxmemory) return REDIS_OK;
+
+    if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
+        return REDIS_ERR; /* We need to free memory, but policy forbids. */
+
+    /* Compute how much memory we need to free. */
+    mem_tofree = mem_used - server.maxmemory;
+    mem_freed = 0;
+    while (mem_freed < mem_tofree) {
+        int j, k, keys_freed = 0;
 
         for (j = 0; j < server.dbnum; j++) {
             long bestval = 0; /* just to prevent warning */
@@ -1872,16 +2008,36 @@ void freeMemoryIfNeeded(void) {
 
             /* Finally remove the selected key. */
             if (bestkey) {
+                long long delta;
+
                 robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
                 propagateExpire(db,keyobj);
+                /* We compute the amount of memory freed by dbDelete() alone.
+                 * It is possible that actually the memory needed to propagate
+                 * the DEL in AOF and replication link is greater than the one
+                 * we are freeing removing the key, but we can't account for
+                 * that otherwise we would never exit the loop.
+                 *
+                 * AOF and Output buffer memory will be freed eventually so
+                 * we only care about memory used by the key space. */
+                delta = (long long) zmalloc_used_memory();
                 dbDelete(db,keyobj);
+                delta -= (long long) zmalloc_used_memory();
+                mem_freed += delta;
                 server.stat_evictedkeys++;
                 decrRefCount(keyobj);
-                freed++;
+                keys_freed++;
+
+                /* When the memory to free starts to be big enough, we may
+                 * start spending so much time here that is impossible to
+                 * deliver data to the slaves fast enough, so we force the
+                 * transmission here inside the loop. */
+                if (slaves) flushSlavesOutputBuffers();
             }
         }
-        if (!freed) return; /* nothing to free... */
+        if (!keys_freed) return REDIS_ERR; /* nothing to free... */
     }
+    return REDIS_OK;
 }
 
 /* =================================== Main! ================================ */