+/*
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
#include "redis.h"
+void signalListAsReady(redisClient *c, robj *key);
+
/*-----------------------------------------------------------------------------
* List API
*----------------------------------------------------------------------------*/
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
}
+/* The function pushes an elmenet to the specified list object 'subject',
+ * at head or tail position as specified by 'where'.
+ *
+ * There is no need for the caller to incremnet the refcount of 'value' as
+ * the function takes care of it if needed. */
void listTypePush(robj *subject, robj *value, int where) {
/* Check if we need to convert the ziplist */
listTypeTryConversion(subject,value);
*----------------------------------------------------------------------------*/
void pushGenericCommand(redisClient *c, int where) {
- int j, addlen = 0, pushed = 0;
+ int j, waiting = 0, pushed = 0;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
int may_have_waiting_clients = (lobj == NULL);
return;
}
+ if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
+
for (j = 2; j < c->argc; j++) {
c->argv[j] = tryObjectEncoding(c->argv[j]);
- if (may_have_waiting_clients) {
- if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
- addlen++;
- continue;
- } else {
- may_have_waiting_clients = 0;
- }
- }
if (!lobj) {
lobj = createZiplistObject();
dbAdd(c->db,c->argv[1],lobj);
listTypePush(lobj,c->argv[j],where);
pushed++;
}
- addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0));
+ addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
if (pushed) signalModifiedKey(c->db,c->argv[1]);
server.dirty += pushed;
}
* as well. This command was originally proposed by Ezra Zygmuntowicz.
*/
-void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
- robj *aux;
-
- if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
- /* Create the list if the key does not exist */
- if (!dstobj) {
- dstobj = createZiplistObject();
- dbAdd(c->db,dstkey,dstobj);
- } else {
- signalModifiedKey(c->db,dstkey);
- }
- listTypePush(dstobj,value,REDIS_HEAD);
- /* If we are pushing as a result of LPUSH against a key
- * watched by BRPOPLPUSH, we need to rewrite the command vector
- * as an LPUSH.
- *
- * If this is called directly by RPOPLPUSH (either directly
- * or via a BRPOPLPUSH where the popped list exists)
- * we should replicate the RPOPLPUSH command itself. */
- if (c != origclient) {
- aux = createStringObject("LPUSH",5);
- rewriteClientCommandVector(origclient,3,aux,dstkey,value);
- decrRefCount(aux);
- } else {
- /* Make sure to always use RPOPLPUSH in the replication / AOF,
- * even if the original command was BRPOPLPUSH. */
- aux = createStringObject("RPOPLPUSH",9);
- rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
- decrRefCount(aux);
- }
- server.dirty++;
+void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+ /* Create the list if the key does not exist */
+ if (!dstobj) {
+ dstobj = createZiplistObject();
+ dbAdd(c->db,dstkey,dstobj);
+ signalListAsReady(c,dstkey);
}
-
+ signalModifiedKey(c->db,dstkey);
+ listTypePush(dstobj,value,REDIS_HEAD);
/* Always send the pushed value to the client. */
addReplyBulk(c,value);
}
checkType(c,sobj,REDIS_LIST)) return;
if (listTypeLength(sobj) == 0) {
+ /* This may only happen after loading very old RDB files. Recent
+ * versions of Redis delete keys of empty lists. */
addReply(c,shared.nullbulk);
} else {
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
if (dobj && checkType(c,dobj,REDIS_LIST)) return;
value = listTypePop(sobj,REDIS_TAIL);
/* We saved touched key, and protect it, since rpoplpushHandlePush
- * may change the client command argument vector. */
+ * may change the client command argument vector (it does not
+ * currently). */
incrRefCount(touchedkey);
- rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
+ rpoplpushHandlePush(c,c->argv[2],dobj,value);
/* listTypePop returns an object with its refcount incremented */
decrRefCount(value);
* Blocking POP operations
*----------------------------------------------------------------------------*/
-/* Currently Redis blocking operations support is limited to list POP ops,
- * so the current implementation is not fully generic, but it is also not
- * completely specific so it will not require a rewrite to support new
- * kind of blocking operations in the future.
- *
- * Still it's important to note that list blocking operations can be already
- * used as a notification mechanism in order to implement other blocking
- * operations at application level, so there must be a very strong evidence
- * of usefulness and generality before new blocking operations are implemented.
- *
- * This is how the current blocking POP works, we use BLPOP as example:
+/* This is how the current blocking POP works, we use BLPOP as example:
* - If the user calls BLPOP and the key exists and contains a non empty list
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
- * if there is not to block.
+ * if blocking is not required.
* - If instead BLPOP is called and the key does not exists or the list is
* empty we need to block. In order to do so we remove the notification for
* new data to read in the client socket (so that we'll not serve new
* in a dictionary (db->blocking_keys) mapping keys to a list of clients
* blocking for this keys.
* - If a PUSH operation against a key with blocked clients waiting is
- * performed, we serve the first in the list: basically instead to push
- * the new element inside the list we return it to the (first / oldest)
- * blocking client, unblock the client, and remove it form the list.
- *
- * The above comment and the source code should be enough in order to understand
- * the implementation and modify / fix it later.
+ * performed, we mark this key as "ready", and after the current command,
+ * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
+ * for this list, from the one that blocked first, to the last, accordingly
+ * to the number of elements we have in the ready list.
*/
/* Set a client in blocking mode for the specified key, with the specified
* timeout */
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
+ dict *added;
dictEntry *de;
list *l;
- int j;
+ int j, i;
c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
- c->bpop.count = numkeys;
c->bpop.timeout = timeout;
c->bpop.target = target;
- if (target != NULL) {
- incrRefCount(target);
- }
+ if (target != NULL) incrRefCount(target);
+
+ /* Create a dictionary that we use to avoid adding duplicated keys
+ * in case the user calls something like: "BLPOP foo foo foo 0".
+ * The rest of the implementation is simpler if we know there are no
+ * duplications in the key waiting list. */
+ added = dictCreate(&setDictType,NULL);
+ i = 0; /* The index for c->bpop.keys[...], we can't use the j loop
+ variable as the list of keys may have duplicated elements. */
for (j = 0; j < numkeys; j++) {
+ /* Add the key in the "added" dictionary to make sure there are
+ * no duplicated keys. */
+ if (dictAdd(added,keys[j],NULL) != DICT_OK) continue;
+ incrRefCount(keys[j]);
+
/* Add the key in the client structure, to map clients -> keys */
- c->bpop.keys[j] = keys[j];
+ c->bpop.keys[i++] = keys[j];
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
}
listAddNodeTail(l,c);
}
+ c->bpop.count = i;
+
/* Mark the client as a blocked client */
c->flags |= REDIS_BLOCKED;
server.bpop_blocked_clients++;
+ dictRelease(added);
}
/* Unblock a client that's waiting in a blocking operation such as BLPOP */
listAddNodeTail(server.unblocked_clients,c);
}
-/* This should be called from any function PUSHing into lists.
- * 'c' is the "pushing client", 'key' is the key it is pushing data against,
- * 'ele' is the element pushed.
+/* If the specified key has clients blocked waiting for list pushes, this
+ * function will put the key reference into the server.ready_keys list.
+ * Note that db->ready_keys is an hash table that allows us to avoid putting
+ * the same key agains and again in the list in case of multiple pushes
+ * made by a script or in the context of MULTI/EXEC.
+ *
+ * The list will be finally processed by handleClientsBlockedOnLists() */
+void signalListAsReady(redisClient *c, robj *key) {
+ readyList *rl;
+
+ /* No clients blocking for this key? No need to queue it. */
+ if (dictFind(c->db->blocking_keys,key) == NULL) return;
+
+ /* Key was already signaled? No need to queue it again. */
+ if (dictFind(c->db->ready_keys,key) != NULL) return;
+
+ /* Ok, we need to queue this key into server.ready_keys. */
+ rl = zmalloc(sizeof(*rl));
+ rl->key = key;
+ rl->db = c->db;
+ incrRefCount(key);
+ listAddNodeTail(server.ready_keys,rl);
+
+ /* We also add the key in the db->ready_keys dictionary in order
+ * to avoid adding it multiple times into a list with a simple O(1)
+ * check. */
+ incrRefCount(key);
+ redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
+}
+
+/* This is an helper function for handleClientsBlockedOnLists(). It's work
+ * is to serve a specific client (receiver) that is blocked on 'key'
+ * in the context of the specified 'db', doing the following:
*
- * If the function returns 0 there was no client waiting for a list push
- * against this key.
+ * 1) Provide the client with the 'value' element.
+ * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
+ * 'value' element on the destionation list (the LPUSH side of the command).
+ * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
+ * the AOF and replication channel.
*
- * If the function returns 1 there was a client waiting for a list push
- * against this key, the element was passed to this client thus it's not
- * needed to actually add it to the list and the caller should return asap. */
-int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
- struct dictEntry *de;
- redisClient *receiver;
- int numclients;
- list *clients;
- listNode *ln;
- robj *dstkey, *dstobj;
-
- de = dictFind(c->db->blocking_keys,key);
- if (de == NULL) return 0;
- clients = dictGetVal(de);
- numclients = listLength(clients);
-
- /* Try to handle the push as long as there are clients waiting for a push.
- * Note that "numclients" is used because the list of clients waiting for a
- * push on "key" is deleted by unblockClient() when empty.
- *
- * This loop will have more than 1 iteration when there is a BRPOPLPUSH
- * that cannot push the target list because it does not contain a list. If
- * this happens, it simply tries the next client waiting for a push. */
- while (numclients--) {
- ln = listFirst(clients);
- redisAssertWithInfo(c,key,ln != NULL);
- receiver = ln->value;
- dstkey = receiver->bpop.target;
-
- /* Protect receiver->bpop.target, that will be freed by
- * the next unblockClientWaitingData() call. */
- if (dstkey) incrRefCount(dstkey);
-
- /* This should remove the first element of the "clients" list. */
- unblockClientWaitingData(receiver);
-
- if (dstkey == NULL) {
- /* BRPOP/BLPOP */
- addReplyMultiBulkLen(receiver,2);
- addReplyBulk(receiver,key);
- addReplyBulk(receiver,ele);
- return 1; /* Serve just the first client as in B[RL]POP semantics */
+ * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
+ * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
+ * we can propagate the command properly.
+ *
+ * The function returns REDIS_OK if we are able to serve the client, otherwise
+ * REDIS_ERR is returned to signal the caller that the list POP operation
+ * should be undoed as the client was not served: This only happens for
+ * BRPOPLPUSH that fails to push the value to the destination key as it is
+ * of the wrong type. */
+int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
+{
+ robj *argv[3];
+
+ if (dstkey == NULL) {
+ /* Propagate the [LR]POP operation. */
+ argv[0] = (where == REDIS_HEAD) ? shared.lpop :
+ shared.rpop;
+ argv[1] = key;
+ propagate((where == REDIS_HEAD) ?
+ server.lpopCommand : server.rpopCommand,
+ db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
+
+ /* BRPOP/BLPOP */
+ addReplyMultiBulkLen(receiver,2);
+ addReplyBulk(receiver,key);
+ addReplyBulk(receiver,value);
+ } else {
+ /* BRPOPLPUSH */
+ robj *dstobj =
+ lookupKeyWrite(receiver->db,dstkey);
+ if (!(dstobj &&
+ checkType(receiver,dstobj,REDIS_LIST)))
+ {
+ /* Propagate the RPOP operation. */
+ argv[0] = shared.rpop;
+ argv[1] = key;
+ propagate(server.rpopCommand,
+ db->id,argv,2,
+ REDIS_PROPAGATE_AOF|
+ REDIS_PROPAGATE_REPL);
+ rpoplpushHandlePush(receiver,dstkey,dstobj,
+ value);
+ /* Propagate the LPUSH operation. */
+ argv[0] = shared.lpush;
+ argv[1] = dstkey;
+ argv[2] = value;
+ propagate(server.lpushCommand,
+ db->id,argv,3,
+ REDIS_PROPAGATE_AOF|
+ REDIS_PROPAGATE_REPL);
} else {
- /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
- dstobj = lookupKeyWrite(receiver->db,dstkey);
- if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
- rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
- decrRefCount(dstkey);
- return 1;
- }
- decrRefCount(dstkey);
+ /* BRPOPLPUSH failed because of wrong
+ * destination type. */
+ return REDIS_ERR;
}
}
+ return REDIS_OK;
+}
- return 0;
+/* This function should be called by Redis every time a single command,
+ * a MULTI/EXEC block, or a Lua script, terminated its execution after
+ * being called by a client.
+ *
+ * All the keys with at least one client blocked that received at least
+ * one new element via some PUSH operation are accumulated into
+ * the server.ready_keys list. This function will run the list and will
+ * serve clients accordingly. Note that the function will iterate again and
+ * again as a result of serving BRPOPLPUSH we can have new blocking clients
+ * to serve because of the PUSH side of BRPOPLPUSH. */
+void handleClientsBlockedOnLists(void) {
+ while(listLength(server.ready_keys) != 0) {
+ list *l;
+
+ /* Point server.ready_keys to a fresh list and save the current one
+ * locally. This way as we run the old list we are free to call
+ * signalListAsReady() that may push new elements in server.ready_keys
+ * when handling clients blocked into BRPOPLPUSH. */
+ l = server.ready_keys;
+ server.ready_keys = listCreate();
+
+ while(listLength(l) != 0) {
+ listNode *ln = listFirst(l);
+ readyList *rl = ln->value;
+
+ /* First of all remove this key from db->ready_keys so that
+ * we can safely call signalListAsReady() against this key. */
+ dictDelete(rl->db->ready_keys,rl->key);
+
+ /* If the key exists and it's a list, serve blocked clients
+ * with data. */
+ robj *o = lookupKeyWrite(rl->db,rl->key);
+ if (o != NULL && o->type == REDIS_LIST) {
+ dictEntry *de;
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ int numclients = listLength(clients);
+
+ while(numclients--) {
+ listNode *clientnode = listFirst(clients);
+ redisClient *receiver = clientnode->value;
+ robj *dstkey = receiver->bpop.target;
+ int where = (receiver->lastcmd &&
+ receiver->lastcmd->proc == blpopCommand) ?
+ REDIS_HEAD : REDIS_TAIL;
+ robj *value = listTypePop(o,where);
+
+ if (value) {
+ /* Protect receiver->bpop.target, that will be
+ * freed by the next unblockClientWaitingData()
+ * call. */
+ if (dstkey) incrRefCount(dstkey);
+ unblockClientWaitingData(receiver);
+
+ if (serveClientBlockedOnList(receiver,
+ rl->key,dstkey,rl->db,value,
+ where) == REDIS_ERR)
+ {
+ /* If we failed serving the client we need
+ * to also undo the POP operation. */
+ listTypePush(o,value,where);
+ }
+
+ if (dstkey) decrRefCount(dstkey);
+ decrRefCount(value);
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
+ /* We don't call signalModifiedKey() as it was already called
+ * when an element was pushed on the list. */
+ }
+
+ /* Free this item. */
+ decrRefCount(rl->key);
+ zfree(rl);
+ listDelNode(l,ln);
+ }
+ listRelease(l); /* We have the new list on place at this point. */
+ }
}
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
return REDIS_ERR;
}
- if (tval > 0) tval += time(NULL);
+ if (tval > 0) tval += server.unixtime;
*timeout = tval;
return REDIS_OK;
return;
} else {
if (listTypeLength(o) != 0) {
- /* If the list contains elements fall back to the usual
- * non-blocking POP operation */
- struct redisCommand *orig_cmd;
- robj *argv[2], **orig_argv;
- int orig_argc;
-
- /* We need to alter the command arguments before to call
- * popGenericCommand() as the command takes a single key. */
- orig_argv = c->argv;
- orig_argc = c->argc;
- orig_cmd = c->cmd;
- argv[1] = c->argv[j];
- c->argv = argv;
- c->argc = 2;
-
- /* Also the return value is different, we need to output
- * the multi bulk reply header and the key name. The
- * "real" command will add the last element (the value)
- * for us. If this souds like an hack to you it's just
- * because it is... */
- addReplyMultiBulkLen(c,2);
- addReplyBulk(c,argv[1]);
-
- popGenericCommand(c,where);
-
- /* Fix the client structure with the original stuff */
- c->argv = orig_argv;
- c->argc = orig_argc;
- c->cmd = orig_cmd;
+ /* Non empty list, this is like a non normal [LR]POP. */
+ robj *value = listTypePop(o,where);
+ redisAssert(value != NULL);
+ addReplyMultiBulkLen(c,2);
+ addReplyBulk(c,c->argv[j]);
+ addReplyBulk(c,value);
+ decrRefCount(value);
+ if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[j]);
+ signalModifiedKey(c->db,c->argv[j]);
+ server.dirty++;
+
+ /* Replicate it as an [LR]POP instead of B[LR]POP. */
+ rewriteClientCommandVector(c,2,
+ (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
+ c->argv[j]);
return;
}
}
if (key == NULL) {
if (c->flags & REDIS_MULTI) {
-
/* Blocking against an empty list in a multi state
* returns immediately. */
addReply(c, shared.nullbulk);
if (key->type != REDIS_LIST) {
addReply(c, shared.wrongtypeerr);
} else {
-
/* The list exists and has elements, so
* the regular rpoplpushCommand is executed. */
redisAssertWithInfo(c,key,listTypeLength(key) > 0);