populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
+ server.lpushCommand = lookupCommandByCString("lpush");
/* Slow log */
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
+/* Used inside commands to propatate an additional command if needed. */
+void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+ int target)
+{
+ propagatedItem *pi = &server.also_propagate;
+
+ redisAssert(pi->target == REDIS_PROPAGATE_NONE);
+ pi->cmd = cmd;
+ pi->dbid = dbid;
+ pi->argv = argv;
+ pi->argc = argc;
+ pi->target = target;
+}
+
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {
long long dirty, start = ustime(), duration;
replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
/* Call the command. */
+ server.also_propagate.target = REDIS_PROPAGATE_NONE;
dirty = server.dirty;
c->cmd->proc(c);
dirty = server.dirty-dirty;
if (flags != REDIS_PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
+ /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional
+ * PUSH command. */
+ if (server.also_propagate.target != REDIS_PROPAGATE_NONE) {
+ int j;
+ propagatedItem *pi = &server.also_propagate;
+
+ propagate(pi->cmd, pi->dbid, pi->argv, pi->argc, pi->target);
+ for (j = 0; j < pi->argc; j++) decrRefCount(pi->argv[j]);
+ zfree(pi->argv);
+ }
server.stat_numcommands++;
}
time_t soft_limit_seconds;
} clientBufferLimitsConfig;
+/* Currently only used to additionally propagate more commands to AOF/Replication
+ * after the propagation of the executed command.
+ * The structure contains everything needed to propagate a command:
+ * argv and argc, the ID of the database, pointer to the command table entry,
+ * and finally the target, that is an xor between REDIS_PROPAGATE_* flags. */
+typedef struct propagatedItem {
+ robj **argv;
+ int argc, dbid, target;
+ struct redisCommand *cmd;
+} propagatedItem;
+
/*-----------------------------------------------------------------------------
* Redis cluster data structures
*----------------------------------------------------------------------------*/
off_t loading_loaded_bytes;
time_t loading_start_time;
/* Fast pointers to often looked up command */
- struct redisCommand *delCommand, *multiCommand;
+ struct redisCommand *delCommand, *multiCommand, *lpushCommand;
int cronloops; /* Number of times the cron function run */
time_t lastsave; /* Unix time of last save succeeede */
/* Fields used only for stats */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
+ /* Propagation of commands in AOF / replication */
+ propagatedItem also_propagate; /* Additional command to propagate. */
/* Logging */
char *logfile; /* Path of log file */
int syslog_enabled; /* Is syslog enabled? */
struct redisCommand *lookupCommandByCString(char *s);
void call(redisClient *c, int flags);
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
+void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
int prepareForShutdown();
void redisLog(int level, const char *fmt, ...);
void redisLogRaw(int level, const char *msg);
addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
if (pushed) signalModifiedKey(c->db,c->argv[1]);
server.dirty += pushed;
+
+ /* Alter the replication of the command accordingly to the number of
+ * list elements delivered to clients waiting into a blocking operation.
+ * We do that only if there were waiting clients, and only if still some
+ * element was pushed into the list (othewise dirty is 0 and nothign will
+ * be propagated). */
+ if (waiting && pushed) {
+ /* CMD KEY a b C D E */
+ for (j = 2; j < pushed+2; j++)
+ rewriteClientCommandArgument(c,j,c->argv[j+waiting]);
+ c->argc -= waiting;
+ }
}
void lpushCommand(redisClient *c) {
*/
void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
- robj *aux;
-
if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
/* Create the list if the key does not exist */
if (!dstobj) {
signalModifiedKey(c->db,dstkey);
}
listTypePush(dstobj,value,REDIS_HEAD);
- /* If we are pushing as a result of LPUSH against a key
- * watched by BRPOPLPUSH, we need to rewrite the command vector
- * as an LPUSH.
- *
- * If this is called directly by RPOPLPUSH (either directly
- * or via a BRPOPLPUSH where the popped list exists)
- * we should replicate the RPOPLPUSH command itself. */
- if (c != origclient) {
- aux = createStringObject("LPUSH",5);
- rewriteClientCommandVector(origclient,3,aux,dstkey,value);
- decrRefCount(aux);
- } else {
- /* Make sure to always use RPOPLPUSH in the replication / AOF,
- * even if the original command was BRPOPLPUSH. */
- aux = createStringObject("RPOPLPUSH",9);
- rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
- decrRefCount(aux);
+ /* Additionally propagate this PUSH operation together with
+ * the operation performed by the command. */
+ {
+ robj **argv = zmalloc(sizeof(robj*)*3);
+ argv[0] = createStringObject("LPUSH",5);
+ argv[1] = dstkey;
+ argv[2] = value;
+ incrRefCount(argv[1]);
+ incrRefCount(argv[2]);
+ alsoPropagate(server.lpushCommand,c->db->id,argv,3,
+ REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
}
- server.dirty++;
}
-
/* Always send the pushed value to the client. */
addReplyBulk(c,value);
}
signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++;
+
+ /* Replicate this as a simple RPOP since the LPUSH side is replicated
+ * by rpoplpushHandlePush() call if needed (it may not be needed
+ * if a client is blocking wait a push against the list). */
+ rewriteClientCommandVector(c,2,
+ resetRefCount(createStringObject("RPOP",4)),
+ c->argv[1]);
}
}