#include "redis.h"
+void signalListAsReady(redisClient *c, robj *key);
+
/*-----------------------------------------------------------------------------
* List API
*----------------------------------------------------------------------------*/
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
}
+/* The function pushes an elmenet to the specified list object 'subject',
+ * at head or tail position as specified by 'where'.
+ *
+ * There is no need for the caller to incremnet the refcount of 'value' as
+ * the function takes care of it if needed. */
void listTypePush(robj *subject, robj *value, int where) {
/* Check if we need to convert the ziplist */
listTypeTryConversion(subject,value);
return;
}
+ if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
+
for (j = 2; j < c->argc; j++) {
c->argv[j] = tryObjectEncoding(c->argv[j]);
- if (may_have_waiting_clients) {
- if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
- waiting++;
- continue;
- } else {
- may_have_waiting_clients = 0;
- }
- }
if (!lobj) {
lobj = createZiplistObject();
dbAdd(c->db,c->argv[1],lobj);
addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
if (pushed) signalModifiedKey(c->db,c->argv[1]);
server.dirty += pushed;
-
- /* Alter the replication of the command accordingly to the number of
- * list elements delivered to clients waiting into a blocking operation.
- * We do that only if there were waiting clients, and only if still some
- * element was pushed into the list (othewise dirty is 0 and nothign will
- * be propagated). */
- if (waiting && pushed) {
- /* CMD KEY a b C D E */
- for (j = 0; j < waiting; j++) decrRefCount(c->argv[j+2]);
- memmove(c->argv+2,c->argv+2+waiting,sizeof(robj*)*pushed);
- c->argc -= waiting;
- }
}
void lpushCommand(redisClient *c) {
* as well. This command was originally proposed by Ezra Zygmuntowicz.
*/
-void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
- if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
- /* Create the list if the key does not exist */
- if (!dstobj) {
- dstobj = createZiplistObject();
- dbAdd(c->db,dstkey,dstobj);
- } else {
- signalModifiedKey(c->db,dstkey);
- }
- listTypePush(dstobj,value,REDIS_HEAD);
- /* Additionally propagate this PUSH operation together with
- * the operation performed by the command. */
- {
- robj **argv = zmalloc(sizeof(robj*)*3);
- argv[0] = createStringObject("LPUSH",5);
- argv[1] = dstkey;
- argv[2] = value;
- incrRefCount(argv[1]);
- incrRefCount(argv[2]);
- alsoPropagate(server.lpushCommand,c->db->id,argv,3,
- REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
- }
+void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+ /* Create the list if the key does not exist */
+ if (!dstobj) {
+ dstobj = createZiplistObject();
+ dbAdd(c->db,dstkey,dstobj);
+ signalListAsReady(c,dstkey);
}
+ signalModifiedKey(c->db,dstkey);
+ listTypePush(dstobj,value,REDIS_HEAD);
/* Always send the pushed value to the client. */
addReplyBulk(c,value);
}
if (dobj && checkType(c,dobj,REDIS_LIST)) return;
value = listTypePop(sobj,REDIS_TAIL);
/* We saved touched key, and protect it, since rpoplpushHandlePush
- * may change the client command argument vector. */
+ * may change the client command argument vector (it does not
+ * currently). */
incrRefCount(touchedkey);
- rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
+ rpoplpushHandlePush(c,c->argv[2],dobj,value);
/* listTypePop returns an object with its refcount incremented */
decrRefCount(value);
signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++;
-
- /* Replicate this as a simple RPOP since the LPUSH side is replicated
- * by rpoplpushHandlePush() call if needed (it may not be needed
- * if a client is blocking wait a push against the list). */
- rewriteClientCommandVector(c,2,
- resetRefCount(createStringObject("RPOP",4)),
- c->argv[1]);
}
}
* Blocking POP operations
*----------------------------------------------------------------------------*/
-/* Currently Redis blocking operations support is limited to list POP ops,
- * so the current implementation is not fully generic, but it is also not
- * completely specific so it will not require a rewrite to support new
- * kind of blocking operations in the future.
- *
- * Still it's important to note that list blocking operations can be already
- * used as a notification mechanism in order to implement other blocking
- * operations at application level, so there must be a very strong evidence
- * of usefulness and generality before new blocking operations are implemented.
- *
- * This is how the current blocking POP works, we use BLPOP as example:
+/* This is how the current blocking POP works, we use BLPOP as example:
* - If the user calls BLPOP and the key exists and contains a non empty list
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
- * if there is not to block.
+ * if blocking is not required.
* - If instead BLPOP is called and the key does not exists or the list is
* empty we need to block. In order to do so we remove the notification for
* new data to read in the client socket (so that we'll not serve new
* in a dictionary (db->blocking_keys) mapping keys to a list of clients
* blocking for this keys.
* - If a PUSH operation against a key with blocked clients waiting is
- * performed, we serve the first in the list: basically instead to push
- * the new element inside the list we return it to the (first / oldest)
- * blocking client, unblock the client, and remove it form the list.
- *
- * The above comment and the source code should be enough in order to understand
- * the implementation and modify / fix it later.
+ * performed, we mark this key as "ready", and after the current command,
+ * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
+ * for this list, from the one that blocked first, to the last, accordingly
+ * to the number of elements we have in the ready list.
*/
/* Set a client in blocking mode for the specified key, with the specified
listAddNodeTail(server.unblocked_clients,c);
}
-/* This should be called from any function PUSHing into lists.
- * 'c' is the "pushing client", 'key' is the key it is pushing data against,
- * 'ele' is the element pushed.
+/* If the specified key has clients blocked waiting for list pushes, this
+ * function will put the key reference into the server.ready_keys list.
+ * Note that db->ready_keys is an hash table that allows us to avoid putting
+ * the same key agains and again in the list in case of multiple pushes
+ * made by a script or in the context of MULTI/EXEC.
*
- * If the function returns 0 there was no client waiting for a list push
- * against this key.
+ * The list will be finally processed by handleClientsBlockedOnLists() */
+void signalListAsReady(redisClient *c, robj *key) {
+ readyList *rl;
+
+ /* No clients blocking for this key? No need to queue it. */
+ if (dictFind(c->db->blocking_keys,key) == NULL) return;
+
+ /* Key was already signaled? No need to queue it again. */
+ if (dictFind(c->db->ready_keys,key) != NULL) return;
+
+ /* Ok, we need to queue this key into server.ready_keys. */
+ rl = zmalloc(sizeof(*rl));
+ rl->key = key;
+ rl->db = c->db;
+ incrRefCount(key);
+ listAddNodeTail(server.ready_keys,rl);
+
+ /* We also add the key in the db->ready_keys dictionary in order
+ * to avoid adding it multiple times into a list with a simple O(1)
+ * check. */
+ incrRefCount(key);
+ redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
+}
+
+/* This is an helper function for handleClientsBlockedOnLists(). It's work
+ * is to serve a specific client (receiver) that is blocked on 'key'
+ * in the context of the specified 'db', doing the following:
*
- * If the function returns 1 there was a client waiting for a list push
- * against this key, the element was passed to this client thus it's not
- * needed to actually add it to the list and the caller should return asap. */
-int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
- struct dictEntry *de;
- redisClient *receiver;
- int numclients;
- list *clients;
- listNode *ln;
- robj *dstkey, *dstobj;
-
- de = dictFind(c->db->blocking_keys,key);
- if (de == NULL) return 0;
- clients = dictGetVal(de);
- numclients = listLength(clients);
-
- /* Try to handle the push as long as there are clients waiting for a push.
- * Note that "numclients" is used because the list of clients waiting for a
- * push on "key" is deleted by unblockClient() when empty.
- *
- * This loop will have more than 1 iteration when there is a BRPOPLPUSH
- * that cannot push the target list because it does not contain a list. If
- * this happens, it simply tries the next client waiting for a push. */
- while (numclients--) {
- ln = listFirst(clients);
- redisAssertWithInfo(c,key,ln != NULL);
- receiver = ln->value;
- dstkey = receiver->bpop.target;
-
- /* Protect receiver->bpop.target, that will be freed by
- * the next unblockClientWaitingData() call. */
- if (dstkey) incrRefCount(dstkey);
-
- /* This should remove the first element of the "clients" list. */
- unblockClientWaitingData(receiver);
-
- if (dstkey == NULL) {
- /* BRPOP/BLPOP */
- addReplyMultiBulkLen(receiver,2);
- addReplyBulk(receiver,key);
- addReplyBulk(receiver,ele);
- return 1; /* Serve just the first client as in B[RL]POP semantics */
+ * 1) Provide the client with the 'value' element.
+ * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
+ * 'value' element on the destionation list (the LPUSH side of the command).
+ * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
+ * the AOF and replication channel.
+ *
+ * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
+ * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
+ * we can propagate the command properly.
+ *
+ * The function returns REDIS_OK if we are able to serve the client, otherwise
+ * REDIS_ERR is returned to signal the caller that the list POP operation
+ * should be undoed as the client was not served: This only happens for
+ * BRPOPLPUSH that fails to push the value to the destination key as it is
+ * of the wrong type. */
+int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
+{
+ robj *argv[3];
+
+ if (dstkey == NULL) {
+ /* Propagate the [LR]POP operation. */
+ argv[0] = (where == REDIS_HEAD) ? shared.lpop :
+ shared.rpop;
+ argv[1] = key;
+ propagate((where == REDIS_HEAD) ?
+ server.lpopCommand : server.rpopCommand,
+ db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
+
+ /* BRPOP/BLPOP */
+ addReplyMultiBulkLen(receiver,2);
+ addReplyBulk(receiver,key);
+ addReplyBulk(receiver,value);
+ } else {
+ /* BRPOPLPUSH */
+ robj *dstobj =
+ lookupKeyWrite(receiver->db,dstkey);
+ if (!(dstobj &&
+ checkType(receiver,dstobj,REDIS_LIST)))
+ {
+ /* Propagate the RPOP operation. */
+ argv[0] = shared.rpop;
+ argv[1] = key;
+ propagate(server.rpopCommand,
+ db->id,argv,2,
+ REDIS_PROPAGATE_AOF|
+ REDIS_PROPAGATE_REPL);
+ rpoplpushHandlePush(receiver,dstkey,dstobj,
+ value);
+ /* Propagate the LPUSH operation. */
+ argv[0] = shared.lpush;
+ argv[1] = dstkey;
+ argv[2] = value;
+ propagate(server.lpushCommand,
+ db->id,argv,3,
+ REDIS_PROPAGATE_AOF|
+ REDIS_PROPAGATE_REPL);
} else {
- /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
- dstobj = lookupKeyWrite(receiver->db,dstkey);
- if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
- rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
- decrRefCount(dstkey);
- return 1;
- }
- decrRefCount(dstkey);
+ /* BRPOPLPUSH failed because of wrong
+ * destination type. */
+ return REDIS_ERR;
}
}
+ return REDIS_OK;
+}
- return 0;
+/* This function should be called by Redis every time a single command,
+ * a MULTI/EXEC block, or a Lua script, terminated its execution after
+ * being called by a client.
+ *
+ * All the keys with at least one client blocked that received at least
+ * one new element via some PUSH operation are accumulated into
+ * the server.ready_keys list. This function will run the list and will
+ * serve clients accordingly. Note that the function will iterate again and
+ * again as a result of serving BRPOPLPUSH we can have new blocking clients
+ * to serve because of the PUSH side of BRPOPLPUSH. */
+void handleClientsBlockedOnLists(void) {
+ while(listLength(server.ready_keys) != 0) {
+ list *l;
+
+ /* Point server.ready_keys to a fresh list and save the current one
+ * locally. This way as we run the old list we are free to call
+ * signalListAsReady() that may push new elements in server.ready_keys
+ * when handling clients blocked into BRPOPLPUSH. */
+ l = server.ready_keys;
+ server.ready_keys = listCreate();
+
+ while(listLength(l) != 0) {
+ listNode *ln = listFirst(l);
+ readyList *rl = ln->value;
+
+ /* First of all remove this key from db->ready_keys so that
+ * we can safely call signalListAsReady() against this key. */
+ dictDelete(rl->db->ready_keys,rl->key);
+
+ /* If the key exists and it's a list, serve blocked clients
+ * with data. */
+ robj *o = lookupKeyWrite(rl->db,rl->key);
+ if (o != NULL && o->type == REDIS_LIST) {
+ dictEntry *de;
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ int numclients = listLength(clients);
+
+ while(numclients--) {
+ listNode *clientnode = listFirst(clients);
+ redisClient *receiver = clientnode->value;
+ robj *dstkey = receiver->bpop.target;
+ int where = (receiver->lastcmd &&
+ receiver->lastcmd->proc == blpopCommand) ?
+ REDIS_HEAD : REDIS_TAIL;
+ robj *value = listTypePop(o,where);
+
+ if (value) {
+ /* Protect receiver->bpop.target, that will be
+ * freed by the next unblockClientWaitingData()
+ * call. */
+ if (dstkey) incrRefCount(dstkey);
+ unblockClientWaitingData(receiver);
+
+ if (serveClientBlockedOnList(receiver,
+ rl->key,dstkey,rl->db,value,
+ where) == REDIS_ERR)
+ {
+ /* If we failed serving the client we need
+ * to also undo the POP operation. */
+ listTypePush(o,value,where);
+ }
+
+ if (dstkey) decrRefCount(dstkey);
+ decrRefCount(value);
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
+ /* We don't call signalModifiedKey() as it was already called
+ * when an element was pushed on the list. */
+ }
+
+ /* Free this item. */
+ decrRefCount(rl->key);
+ zfree(rl);
+ listDelNode(l,ln);
+ }
+ listRelease(l); /* We have the new list on place at this point. */
+ }
}
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
if (key == NULL) {
if (c->flags & REDIS_MULTI) {
-
/* Blocking against an empty list in a multi state
* returns immediately. */
addReply(c, shared.nullbulk);
if (key->type != REDIS_LIST) {
addReply(c, shared.wrongtypeerr);
} else {
-
/* The list exists and has elements, so
* the regular rpoplpushCommand is executed. */
redisAssertWithInfo(c,key,listTypeLength(key) > 0);