long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
multiState mstate; /* MULTI/EXEC state */
- robj *blockingkey; /* The key we waiting to terminate a blocking
+ robj **blockingkeys; /* The key we waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
+ int blockingkeysnum; /* Number of blocking keys */
time_t blockingto; /* Blocking operation timeout. If UNIX current time
* is >= blockingto then the operation timed out. */
} redisClient;
{"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"rpop",rpopCommand,2,REDIS_CMD_INLINE},
{"lpop",lpopCommand,2,REDIS_CMD_INLINE},
- {"brpop",brpopCommand,3,REDIS_CMD_INLINE},
- {"blpop",blpopCommand,3,REDIS_CMD_INLINE},
+ {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
+ {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
{"llen",llenCommand,2,REDIS_CMD_INLINE},
{"lindex",lindexCommand,3,REDIS_CMD_INLINE},
{"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
freeClient(c);
} else if (c->flags & REDIS_BLOCKED) {
if (c->blockingto != 0 && c->blockingto < now) {
- addReply(c,shared.nullbulk);
+ addReply(c,shared.nullmultibulk);
unblockClient(c);
}
}
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
c->reply = listCreate();
- c->blockingkey = NULL;
+ c->blockingkeys = NULL;
+ c->blockingkeysnum = 0;
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
/* Set a client in blocking mode for the specified key, with the specified
* timeout */
-static void blockForKey(redisClient *c, robj *key, time_t timeout) {
+static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
dictEntry *de;
list *l;
+ int j;
- c->blockingkey = key;
- incrRefCount(key);
+ c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
+ c->blockingkeysnum = numkeys;
c->blockingto = timeout;
- de = dictFind(c->db->blockingkeys,key);
- if (de == NULL) {
- int retval;
+ for (j = 0; j < numkeys; j++) {
+ /* Add the key in the client structure, to map clients -> keys */
+ c->blockingkeys[j] = keys[j];
+ incrRefCount(keys[j]);
- /* We take a list of clients blocked for a given key */
- l = listCreate();
- retval = dictAdd(c->db->blockingkeys,key,l);
- incrRefCount(key);
- assert(retval == DICT_OK);
- } else {
- l = dictGetEntryVal(de);
+ /* And in the other "side", to map keys -> clients */
+ de = dictFind(c->db->blockingkeys,keys[j]);
+ if (de == NULL) {
+ int retval;
+
+ /* For every key we take a list of clients blocked for it */
+ l = listCreate();
+ retval = dictAdd(c->db->blockingkeys,keys[j],l);
+ incrRefCount(keys[j]);
+ assert(retval == DICT_OK);
+ } else {
+ l = dictGetEntryVal(de);
+ }
+ listAddNodeTail(l,c);
}
- /* Add this client to the list, and mark it as blocked */
- listAddNodeTail(l,c);
+ /* Mark the client as a blocked client */
c->flags |= REDIS_BLOCKED;
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
server.blockedclients++;
static void unblockClient(redisClient *c) {
dictEntry *de;
list *l;
+ int j;
- /* Remove this client from the list of clients waiting for this key. */
- assert(c->blockingkey != NULL);
- de = dictFind(c->db->blockingkeys,c->blockingkey);
- assert(de != NULL);
- l = dictGetEntryVal(de);
- listDelNode(l,listSearchKey(l,c));
- /* If the list is empty we need to remove it to avoid wasting memory */
- if (listLength(l) == 0)
- dictDelete(c->db->blockingkeys,c->blockingkey);
- /* Finally set the right flags in the client structure */
- decrRefCount(c->blockingkey);
- c->blockingkey = NULL;
+ assert(c->blockingkeys != NULL);
+ /* The client may wait for multiple keys, so unblock it for every key. */
+ for (j = 0; j < c->blockingkeysnum; j++) {
+ /* Remove this client from the list of clients waiting for this key. */
+ de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
+ assert(de != NULL);
+ l = dictGetEntryVal(de);
+ listDelNode(l,listSearchKey(l,c));
+ /* If the list is empty we need to remove it to avoid wasting memory */
+ if (listLength(l) == 0)
+ dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
+ decrRefCount(c->blockingkeys[j]);
+ }
+ /* Cleanup the client structure */
+ zfree(c->blockingkeys);
+ c->blockingkeys = NULL;
c->flags &= (~REDIS_BLOCKED);
server.blockedclients--;
/* Ok now we are ready to get read events from socket, note that we
assert(ln != NULL);
receiver = ln->value;
+ addReplySds(receiver,sdsnew("*2\r\n"));
+ addReplyBulkLen(receiver,key);
+ addReply(receiver,key);
+ addReply(receiver,shared.crlf);
addReplyBulkLen(receiver,ele);
addReply(receiver,ele);
addReply(receiver,shared.crlf);
static void blockingPopGenericCommand(redisClient *c, int where) {
robj *o;
time_t timeout;
+ int j;
- o = lookupKeyWrite(c->db,c->argv[1]);
- if (o != NULL) {
- if (o->type != REDIS_LIST) {
- popGenericCommand(c,where);
- return;
- } else {
- list *list = o->ptr;
- if (listLength(list) != 0) {
- /* If the list contains elements fall back to the usual
- * non-blocking POP operation */
- popGenericCommand(c,where);
+ for (j = 1; j < c->argc-1; j++) {
+ o = lookupKeyWrite(c->db,c->argv[j]);
+ if (o != NULL) {
+ if (o->type != REDIS_LIST) {
+ addReply(c,shared.wrongtypeerr);
return;
+ } else {
+ list *list = o->ptr;
+ if (listLength(list) != 0) {
+ /* If the list contains elements fall back to the usual
+ * non-blocking POP operation */
+ robj *argv[2], **orig_argv;
+ int orig_argc;
+
+ /* We need to alter the command arguments before to call
+ * popGenericCommand() as the command takes a single key. */
+ orig_argv = c->argv;
+ orig_argc = c->argc;
+ argv[1] = c->argv[j];
+ c->argv = argv;
+ c->argc = 2;
+
+ /* Also the return value is different, we need to output
+ * the multi bulk reply header and the key name. The
+ * "real" command will add the last element (the value)
+ * for us. If this souds like an hack to you it's just
+ * because it is... */
+ addReplySds(c,sdsnew("*2\r\n"));
+ addReplyBulkLen(c,argv[1]);
+ addReply(c,argv[1]);
+ addReply(c,shared.crlf);
+ popGenericCommand(c,where);
+
+ /* Fix the client structure with the original stuff */
+ c->argv = orig_argv;
+ c->argc = orig_argc;
+ return;
+ }
}
}
}
/* If the list is empty or the key does not exists we must block */
- timeout = strtol(c->argv[2]->ptr,NULL,10);
+ timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
if (timeout > 0) timeout += time(NULL);
- blockForKey(c,c->argv[1],timeout);
+ blockForKeys(c,c->argv+1,c->argc-2,timeout);
}
static void blpopCommand(redisClient *c) {
redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
}
}
- // free(messages); Don't call free() with possibly corrupted memory.
+ /* free(messages); Don't call free() with possibly corrupted memory. */
exit(0);
}