}
/* Initialize an iterator at the specified index. */
-listTypeIterator *listTypeInitIterator(robj *subject, int index, unsigned char direction) {
+listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) {
listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
li->subject = subject;
li->encoding = subject->encoding;
*----------------------------------------------------------------------------*/
void pushGenericCommand(redisClient *c, int where) {
- int j, addlen = 0, pushed = 0;
+ int j, waiting = 0, pushed = 0;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
int may_have_waiting_clients = (lobj == NULL);
c->argv[j] = tryObjectEncoding(c->argv[j]);
if (may_have_waiting_clients) {
if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
- addlen++;
+ waiting++;
continue;
} else {
may_have_waiting_clients = 0;
listTypePush(lobj,c->argv[j],where);
pushed++;
}
- addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0));
+ addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
if (pushed) signalModifiedKey(c->db,c->argv[1]);
server.dirty += pushed;
+
+ /* Alter the replication of the command accordingly to the number of
+ * list elements delivered to clients waiting into a blocking operation.
+ * We do that only if there were waiting clients, and only if still some
+ * element was pushed into the list (othewise dirty is 0 and nothign will
+ * be propagated). */
+ if (waiting && pushed) {
+ /* CMD KEY a b C D E */
+ for (j = 0; j < waiting; j++) decrRefCount(c->argv[j+2]);
+ memmove(c->argv+2,c->argv+2+waiting,sizeof(robj*)*pushed);
+ c->argc -= waiting;
+ }
}
void lpushCommand(redisClient *c) {
void lrangeCommand(redisClient *c) {
robj *o;
- long start;
- long end;
- int llen;
- int rangelen;
+ long start, end, llen, rangelen;
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
void ltrimCommand(redisClient *c) {
robj *o;
- long start;
- long end;
- int llen;
- int j, ltrim, rtrim;
+ long start, end, llen, j, ltrim, rtrim;
list *list;
listNode *ln;
robj *subject, *obj;
obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
long toremove;
- int removed = 0;
+ long removed = 0;
listTypeEntry entry;
if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
*/
void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
- robj *aux;
-
if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
/* Create the list if the key does not exist */
if (!dstobj) {
signalModifiedKey(c->db,dstkey);
}
listTypePush(dstobj,value,REDIS_HEAD);
- /* If we are pushing as a result of LPUSH against a key
- * watched by BRPOPLPUSH, we need to rewrite the command vector
- * as an LPUSH.
- *
- * If this is called directly by RPOPLPUSH (either directly
- * or via a BRPOPLPUSH where the popped list exists)
- * we should replicate the RPOPLPUSH command itself. */
- if (c != origclient) {
- aux = createStringObject("LPUSH",5);
- rewriteClientCommandVector(origclient,3,aux,dstkey,value);
- decrRefCount(aux);
- } else {
- /* Make sure to always use RPOPLPUSH in the replication / AOF,
- * even if the original command was BRPOPLPUSH. */
- aux = createStringObject("RPOPLPUSH",9);
- rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
- decrRefCount(aux);
+ /* Additionally propagate this PUSH operation together with
+ * the operation performed by the command. */
+ {
+ robj **argv = zmalloc(sizeof(robj*)*3);
+ argv[0] = createStringObject("LPUSH",5);
+ argv[1] = dstkey;
+ argv[2] = value;
+ incrRefCount(argv[1]);
+ incrRefCount(argv[2]);
+ alsoPropagate(server.lpushCommand,c->db->id,argv,3,
+ REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
}
- server.dirty++;
}
-
/* Always send the pushed value to the client. */
addReplyBulk(c,value);
}
signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++;
+
+ /* Replicate this as a simple RPOP since the LPUSH side is replicated
+ * by rpoplpushHandlePush() call if needed (it may not be needed
+ * if a client is blocking wait a push against the list). */
+ rewriteClientCommandVector(c,2,
+ resetRefCount(createStringObject("RPOP",4)),
+ c->argv[1]);
}
}
return;
} else {
if (listTypeLength(o) != 0) {
- /* If the list contains elements fall back to the usual
- * non-blocking POP operation */
- struct redisCommand *orig_cmd;
- robj *argv[2], **orig_argv;
- int orig_argc;
-
- /* We need to alter the command arguments before to call
- * popGenericCommand() as the command takes a single key. */
- orig_argv = c->argv;
- orig_argc = c->argc;
- orig_cmd = c->cmd;
- argv[1] = c->argv[j];
- c->argv = argv;
- c->argc = 2;
-
- /* Also the return value is different, we need to output
- * the multi bulk reply header and the key name. The
- * "real" command will add the last element (the value)
- * for us. If this souds like an hack to you it's just
- * because it is... */
- addReplyMultiBulkLen(c,2);
- addReplyBulk(c,argv[1]);
-
- popGenericCommand(c,where);
-
- /* Fix the client structure with the original stuff */
- c->argv = orig_argv;
- c->argc = orig_argc;
- c->cmd = orig_cmd;
+ /* Non empty list, this is like a non normal [LR]POP. */
+ robj *value = listTypePop(o,where);
+ redisAssert(value != NULL);
+ addReplyMultiBulkLen(c,2);
+ addReplyBulk(c,c->argv[j]);
+ addReplyBulk(c,value);
+ decrRefCount(value);
+ if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[j]);
+ signalModifiedKey(c->db,c->argv[j]);
+ server.dirty++;
+
+ /* Replicate it as an [LR]POP instead of B[LR]POP. */
+ rewriteClientCommandVector(c,2,
+ (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
+ c->argv[j]);
return;
}
}