populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
+ server.lpushCommand = lookupCommandByCString("lpush");
/* Slow log */
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
}
}
+/* ========================== Redis OP Array API ============================ */
+
+void redisOpArrayInit(redisOpArray *oa) {
+ oa->ops = NULL;
+ oa->numops = 0;
+}
+
+int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
+ robj **argv, int argc, int target)
+{
+ redisOp *op;
+
+ oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
+ op = oa->ops+oa->numops;
+ op->cmd = cmd;
+ op->dbid = dbid;
+ op->argv = argv;
+ op->argc = argc;
+ op->target = target;
+ oa->numops++;
+ return oa->numops;
+}
+
+void redisOpArrayFree(redisOpArray *oa) {
+ while(oa->numops) {
+ int j;
+ redisOp *op;
+
+ oa->numops--;
+ op = oa->ops+oa->numops;
+ for (j = 0; j < op->argc; j++)
+ decrRefCount(op->argv[j]);
+ zfree(op->argv);
+ }
+ zfree(oa->ops);
+}
+
/* ====================== Commands lookup and execution ===================== */
struct redisCommand *lookupCommand(sds name) {
return cmd;
}
+/* Propagate the specified command (in the context of the specified database id)
+ * to AOF, Slaves and Monitors.
+ *
+ * flags are an xor between:
+ * + REDIS_PROPAGATE_NONE (no propagation of command at all)
+ * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
+ * + REDIS_PROPAGATE_REPL (propagate into the replication link)
+ */
+void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+ int flags)
+{
+ if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
+ feedAppendOnlyFile(cmd,dbid,argv,argc);
+ if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
+ replicationFeedSlaves(server.slaves,dbid,argv,argc);
+}
+
+/* Used inside commands to schedule the propagation of additional commands
+ * after the current command is propagated to AOF / Replication. */
+void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+ int target)
+{
+ redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target);
+}
+
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
long long dirty, start = ustime(), duration;
+ /* Sent the command to clients in MONITOR mode, only if the commands are
+ * not geneated from reading an AOF. */
+ if (listLength(server.monitors) && !server.loading)
+ replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
+
+ /* Call the command. */
+ redisOpArrayInit(&server.also_propagate);
dirty = server.dirty;
c->cmd->proc(c);
dirty = server.dirty-dirty;
if (server.loading && c->flags & REDIS_LUA_CLIENT)
flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);
+ /* Log the command into the Slow log if needed, and populate the
+ * per-command statistics that we show in INFO commandstats. */
if (flags & REDIS_CALL_SLOWLOG)
slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
if (flags & REDIS_CALL_STATS) {
c->cmd->microseconds += duration;
c->cmd->calls++;
}
+
+ /* Propagate the command into the AOF and replication link */
if (flags & REDIS_CALL_PROPAGATE) {
- if (server.aof_state != REDIS_AOF_OFF && dirty > 0)
- feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
- if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
- listLength(server.slaves))
- replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
- if (listLength(server.monitors))
- replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
+ int flags = REDIS_PROPAGATE_NONE;
+
+ if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)
+ flags |= REDIS_PROPAGATE_REPL;
+ if (dirty)
+ flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
+ if (flags != REDIS_PROPAGATE_NONE)
+ propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
+ }
+ /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional
+ * PUSH command. */
+ if (server.also_propagate.numops) {
+ int j;
+ redisOp *rop;
+
+ for (j = 0; j < server.also_propagate.numops; j++) {
+ rop = &server.also_propagate.ops[j];
+ propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);
+ }
+ redisOpArrayFree(&server.also_propagate);
}
server.stat_numcommands++;
}