void pushGenericCommand(redisClient *c, int where) {
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
+ c->argv[2] = tryObjectEncoding(c->argv[2]);
if (lobj == NULL) {
if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
addReply(c,shared.cone);
return;
}
if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
+ touchWatchedKey(c->db,c->argv[1]);
addReply(c,shared.cone);
return;
}
}
listTypePush(lobj,c->argv[2],where);
addReplyLongLong(c,listTypeLength(lobj));
+ touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
}
if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
+ touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
} else {
/* Notify client of a failed insert */
}
} else {
listTypePush(subject,val,where);
+ touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
}
- addReplyUlong(c,listTypeLength(subject));
+ addReplyLongLong(c,listTypeLength(subject));
}
void lpushxCommand(redisClient *c) {
+ c->argv[2] = tryObjectEncoding(c->argv[2]);
pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
}
void rpushxCommand(redisClient *c) {
+ c->argv[2] = tryObjectEncoding(c->argv[2]);
pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
}
void linsertCommand(redisClient *c) {
+ c->argv[4] = tryObjectEncoding(c->argv[4]);
if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
} else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
void llenCommand(redisClient *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
if (o == NULL || checkType(c,o,REDIS_LIST)) return;
- addReplyUlong(c,listTypeLength(o));
+ addReplyLongLong(c,listTypeLength(o));
}
void lindexCommand(redisClient *c) {
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL || checkType(c,o,REDIS_LIST)) return;
int index = atoi(c->argv[2]->ptr);
- robj *value = c->argv[3];
+ robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
listTypeTryConversion(o,value);
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
decrRefCount(value);
addReply(c,shared.ok);
+ touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
}
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
listNodeValue(ln) = value;
incrRefCount(value);
addReply(c,shared.ok);
+ touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
}
} else {
addReplyBulk(c,value);
decrRefCount(value);
if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
+ touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
}
}
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
- if (end < 0) end = 0;
- /* indexes sanity checks */
+ /* Invariant: start >= 0, so this test will be true when end < 0.
+ * The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
- /* Out of range start or start > end result in empty list */
addReply(c,shared.emptymultibulk);
return;
}
rangelen = (end-start)+1;
/* Return the result in form of a multi-bulk reply */
- addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
+ addReplyMultiBulkLen(c,rangelen);
listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL);
for (j = 0; j < rangelen; j++) {
redisAssert(listTypeNext(li,&entry));
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
- if (end < 0) end = 0;
- /* indexes sanity checks */
+ /* Invariant: start >= 0, so this test will be true when end < 0.
+ * The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
/* Out of range start or start > end result in empty list */
ltrim = llen;
redisPanic("Unknown list encoding");
}
if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
+ touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.ok);
}
void lremCommand(redisClient *c) {
- robj *subject, *obj = c->argv[3];
+ robj *subject, *obj;
+ obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
int toremove = atoi(c->argv[2]->ptr);
int removed = 0;
listTypeEntry entry;
decrRefCount(obj);
if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
- addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
+ addReplyLongLong(c,removed);
+ if (removed) touchWatchedKey(c->db,c->argv[1]);
}
/* This is the semantic of this command:
* since the element is not just returned but pushed against another list
* as well. This command was originally proposed by Ezra Zygmuntowicz.
*/
-void rpoplpushcommand(redisClient *c) {
+void rpoplpushCommand(redisClient *c) {
robj *sobj, *value;
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
checkType(c,sobj,REDIS_LIST)) return;
/* Delete the source list when it is empty */
if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
+ touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
}
}
/* Set a client in blocking mode for the specified key, with the specified
* timeout */
-void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
+void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
dictEntry *de;
list *l;
int j;
- c->blocking_keys = zmalloc(sizeof(robj*)*numkeys);
- c->blocking_keys_num = numkeys;
- c->blockingto = timeout;
+ c->bstate.keys = zmalloc(sizeof(robj*)*numkeys);
+ c->bstate.count = numkeys;
+ c->bstate.timeout = timeout;
+ c->bstate.target = target;
+
+ if (target != NULL) {
+ incrRefCount(target);
+ }
+
for (j = 0; j < numkeys; j++) {
/* Add the key in the client structure, to map clients -> keys */
- c->blocking_keys[j] = keys[j];
+ c->bstate.keys[j] = keys[j];
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
list *l;
int j;
- redisAssert(c->blocking_keys != NULL);
+ redisAssert(c->bstate.keys != NULL);
/* The client may wait for multiple keys, so unblock it for every key. */
- for (j = 0; j < c->blocking_keys_num; j++) {
+ for (j = 0; j < c->bstate.count; j++) {
/* Remove this client from the list of clients waiting for this key. */
- de = dictFind(c->db->blocking_keys,c->blocking_keys[j]);
+ de = dictFind(c->db->blocking_keys,c->bstate.keys[j]);
redisAssert(de != NULL);
l = dictGetEntryVal(de);
listDelNode(l,listSearchKey(l,c));
/* If the list is empty we need to remove it to avoid wasting memory */
if (listLength(l) == 0)
- dictDelete(c->db->blocking_keys,c->blocking_keys[j]);
- decrRefCount(c->blocking_keys[j]);
+ dictDelete(c->db->blocking_keys,c->bstate.keys[j]);
+ decrRefCount(c->bstate.keys[j]);
+ }
+
+ if (c->bstate.target != NULL) {
+ decrRefCount(c->bstate.target);
}
+
/* Cleanup the client structure */
- zfree(c->blocking_keys);
- c->blocking_keys = NULL;
+ zfree(c->bstate.keys);
+ c->bstate.keys = NULL;
+ c->bstate.target = NULL;
c->flags &= (~REDIS_BLOCKED);
server.blpop_blocked_clients--;
/* We want to process data if there is some command waiting
redisAssert(ln != NULL);
receiver = ln->value;
- addReplySds(receiver,sdsnew("*2\r\n"));
- addReplyBulk(receiver,key);
- addReplyBulk(receiver,ele);
+ if (receiver->bstate.target == NULL) {
+ addReplyMultiBulkLen(receiver,2);
+ addReplyBulk(receiver,key);
+ addReplyBulk(receiver,ele);
+ }
+ else {
+ robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target);
+ if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
+
+ addReplyBulk(receiver,ele);
+
+ /* Create the list if the key does not exist */
+ if (!dobj) {
+ dobj = createZiplistObject();
+ dbAdd(receiver->db,receiver->bstate.target,dobj);
+ }
+
+ listTypePush(dobj,ele,REDIS_HEAD);
+ }
+
unblockClientWaitingData(receiver);
return 1;
}
time_t timeout;
int j;
+ if (checkTimeout(c, c->argv[c->argc - 1], &timeout) != REDIS_OK) {
+ return;
+ }
+
for (j = 1; j < c->argc-1; j++) {
o = lookupKeyWrite(c->db,c->argv[j]);
if (o != NULL) {
* "real" command will add the last element (the value)
* for us. If this souds like an hack to you it's just
* because it is... */
- addReplySds(c,sdsnew("*2\r\n"));
+ addReplyMultiBulkLen(c,2);
addReplyBulk(c,argv[1]);
+
popGenericCommand(c,where);
/* Fix the client structure with the original stuff */
c->argv = orig_argv;
c->argc = orig_argc;
+
return;
}
}
}
}
+
+ /* If we are inside a MULTI/EXEC and the list is empty the only thing
+ * we can do is treating it as a timeout (even with timeout 0). */
+ if (c->flags & REDIS_MULTI) {
+ addReply(c,shared.nullmultibulk);
+ return;
+ }
+
/* If the list is empty or the key does not exists we must block */
- timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
if (timeout > 0) timeout += time(NULL);
- blockForKeys(c,c->argv+1,c->argc-2,timeout);
+ blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
+}
+
+int checkTimeout(redisClient *c, robj *object, time_t *timeout) {
+ long long lltimeout;
+
+ if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) {
+ addReplyError(c, "timeout is not an integer");
+ return REDIS_ERR;
+ }
+
+ if (lltimeout < 0) {
+ addReplyError(c, "timeout is negative");
+ return REDIS_ERR;
+ }
+
+ *timeout = lltimeout;
+
+ return REDIS_OK;
}
void blpopCommand(redisClient *c) {
void brpopCommand(redisClient *c) {
blockingPopGenericCommand(c,REDIS_TAIL);
}
+
+void brpoplpushCommand(redisClient *c) {
+ time_t timeout;
+
+ if (checkTimeout(c, c->argv[3], &timeout) != REDIS_OK) {
+ return;
+ }
+
+ robj *key = lookupKeyWrite(c->db, c->argv[1]);
+
+
+ if (key == NULL) {
+ // block
+ if (c->flags & REDIS_MULTI) {
+ addReply(c,shared.nullmultibulk);
+ } else {
+ if (timeout > 0) timeout += time(NULL);
+ blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
+ }
+ } else if (key->type != REDIS_LIST) {
+ addReply(c, shared.wrongtypeerr);
+ } else {
+ // The list exists and has elements.
+ redisAssert(listTypeLength(key) > 0);
+ rpoplpushCommand(c);
+ }
+}