c->reply = listCreate();
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
- c->blocking_keys = NULL;
- c->blocking_keys_num = 0;
+ c->bstate.keys = NULL;
+ c->bstate.count = 0;
+ c->bstate.timeout = 0;
+ c->bstate.target = NULL;
c->io_keys = listCreate();
c->watched_keys = listCreate();
listSetFreeMethod(c->io_keys,decrRefCount);
redisLog(REDIS_VERBOSE,"Closing idle client");
freeClient(c);
} else if (c->flags & REDIS_BLOCKED) {
- if (c->blockingto != 0 && c->blockingto < now) {
+ if (c->bstate.timeout != 0 && c->bstate.timeout < now) {
addReply(c,shared.nullmultibulk);
unblockClientWaitingData(c);
}
int count; /* Total number of MULTI commands */
} multiState;
+typedef struct blockingState {
+ robj **keys; /* The key we are waiting to terminate a blocking
+ * operation such as BLPOP. Otherwise NULL. */
+ int count; /* Number of blocking keys */
+ time_t timeout; /* Blocking operation timeout. If UNIX current time
+ * is >= timeout then the operation timed out. */
+ robj *target; /* The key that should receive the element,
+ * for BRPOPLPUSH. */
+} blockingState;
+
/* With multiplexing we need to take per-clinet state.
* Clients are taken in a liked list. */
typedef struct redisClient {
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
multiState mstate; /* MULTI/EXEC state */
- robj **blocking_keys; /* The key we are waiting to terminate a blocking
- * operation such as BLPOP. Otherwise NULL. */
- int blocking_keys_num; /* Number of blocking keys */
- time_t blockingto; /* Blocking operation timeout. If UNIX current time
- * is >= blockingto then the operation timed out. */
- robj *blocking_target;
+ blockingState bstate; /* blocking state */
list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
list *l;
int j;
- c->blocking_keys = zmalloc(sizeof(robj*)*numkeys);
- c->blocking_keys_num = numkeys;
- c->blockingto = timeout;
+ c->bstate.keys = zmalloc(sizeof(robj*)*numkeys);
+ c->bstate.count = numkeys;
+ c->bstate.timeout = timeout;
for (j = 0; j < numkeys; j++) {
/* Add the key in the client structure, to map clients -> keys */
- c->blocking_keys[j] = keys[j];
+ c->bstate.keys[j] = keys[j];
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
list *l;
int j;
- redisAssert(c->blocking_keys != NULL);
+ redisAssert(c->bstate.keys != NULL);
/* The client may wait for multiple keys, so unblock it for every key. */
- for (j = 0; j < c->blocking_keys_num; j++) {
+ for (j = 0; j < c->bstate.count; j++) {
/* Remove this client from the list of clients waiting for this key. */
- de = dictFind(c->db->blocking_keys,c->blocking_keys[j]);
+ de = dictFind(c->db->blocking_keys,c->bstate.keys[j]);
redisAssert(de != NULL);
l = dictGetEntryVal(de);
listDelNode(l,listSearchKey(l,c));
/* If the list is empty we need to remove it to avoid wasting memory */
if (listLength(l) == 0)
- dictDelete(c->db->blocking_keys,c->blocking_keys[j]);
- decrRefCount(c->blocking_keys[j]);
+ dictDelete(c->db->blocking_keys,c->bstate.keys[j]);
+ decrRefCount(c->bstate.keys[j]);
}
/* Cleanup the client structure */
- zfree(c->blocking_keys);
- c->blocking_keys = NULL;
+ zfree(c->bstate.keys);
+ c->bstate.keys = NULL;
c->flags &= (~REDIS_BLOCKED);
server.blpop_blocked_clients--;
/* We want to process data if there is some command waiting
redisAssert(ln != NULL);
receiver = ln->value;
- if (receiver->blocking_target == NULL) {
+ if (receiver->bstate.target == NULL) {
addReplyMultiBulkLen(receiver,2);
addReplyBulk(receiver,key);
addReplyBulk(receiver,ele);
else {
receiver->argc++;
- robj *dobj = lookupKeyWrite(receiver->db,receiver->blocking_target);
+ robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target);
if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
addReplyBulk(receiver,ele);
/* Create the list if the key does not exist */
if (!dobj) {
dobj = createZiplistObject();
- dbAdd(receiver->db,receiver->blocking_target,dobj);
+ dbAdd(receiver->db,receiver->bstate.target,dobj);
}
listTypePush(dobj,ele,REDIS_HEAD);
robj *argv[2], **orig_argv;
int orig_argc;
- if (c->blocking_target == NULL) {
+ if (c->bstate.target == NULL) {
/* We need to alter the command arguments before to call
* popGenericCommand() as the command takes a single key. */
orig_argv = c->argv;
c->argc = orig_argc;
}
else {
- c->argv[2] = c->blocking_target;
- c->blocking_target = NULL;
+ c->argv[2] = c->bstate.target;
+ c->bstate.target = NULL;
rpoplpushCommand(c);
}
}
void brpoplpushCommand(redisClient *c) {
- c->blocking_target = c->argv[2];
+ c->bstate.target = c->argv[2];
c->argv[2] = c->argv[3];
c->argc--;