From 357a841714d50f3d3c293b9bb03839bac28cb05a Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Mon, 8 Nov 2010 19:38:01 -0300 Subject: [PATCH 1/1] Move to struct. --- src/networking.c | 8 +++++--- src/redis.h | 17 +++++++++++------ src/t_list.c | 36 ++++++++++++++++++------------------ 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/src/networking.c b/src/networking.c index 634e2107..530e2ca8 100644 --- a/src/networking.c +++ b/src/networking.c @@ -41,8 +41,10 @@ redisClient *createClient(int fd) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->blocking_keys = NULL; - c->blocking_keys_num = 0; + c->bstate.keys = NULL; + c->bstate.count = 0; + c->bstate.timeout = 0; + c->bstate.target = NULL; c->io_keys = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); @@ -677,7 +679,7 @@ void closeTimedoutClients(void) { redisLog(REDIS_VERBOSE,"Closing idle client"); freeClient(c); } else if (c->flags & REDIS_BLOCKED) { - if (c->blockingto != 0 && c->blockingto < now) { + if (c->bstate.timeout != 0 && c->bstate.timeout < now) { addReply(c,shared.nullmultibulk); unblockClientWaitingData(c); } diff --git a/src/redis.h b/src/redis.h index 83a94483..0de94585 100644 --- a/src/redis.h +++ b/src/redis.h @@ -293,6 +293,16 @@ typedef struct multiState { int count; /* Total number of MULTI commands */ } multiState; +typedef struct blockingState { + robj **keys; /* The key we are waiting to terminate a blocking + * operation such as BLPOP. Otherwise NULL. */ + int count; /* Number of blocking keys */ + time_t timeout; /* Blocking operation timeout. If UNIX current time + * is >= timeout then the operation timed out. */ + robj *target; /* The key that should receive the element, + * for BRPOPLPUSH. */ +} blockingState; + /* With multiplexing we need to take per-clinet state. * Clients are taken in a liked list. */ typedef struct redisClient { @@ -316,12 +326,7 @@ typedef struct redisClient { long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - robj **blocking_keys; /* The key we are waiting to terminate a blocking - * operation such as BLPOP. Otherwise NULL. */ - int blocking_keys_num; /* Number of blocking keys */ - time_t blockingto; /* Blocking operation timeout. If UNIX current time - * is >= blockingto then the operation timed out. */ - robj *blocking_target; + blockingState bstate; /* blocking state */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ diff --git a/src/t_list.c b/src/t_list.c index 437f4004..17015acf 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -694,12 +694,12 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { list *l; int j; - c->blocking_keys = zmalloc(sizeof(robj*)*numkeys); - c->blocking_keys_num = numkeys; - c->blockingto = timeout; + c->bstate.keys = zmalloc(sizeof(robj*)*numkeys); + c->bstate.count = numkeys; + c->bstate.timeout = timeout; for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->blocking_keys[j] = keys[j]; + c->bstate.keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ @@ -728,22 +728,22 @@ void unblockClientWaitingData(redisClient *c) { list *l; int j; - redisAssert(c->blocking_keys != NULL); + redisAssert(c->bstate.keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->blocking_keys_num; j++) { + for (j = 0; j < c->bstate.count; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blocking_keys,c->blocking_keys[j]); + de = dictFind(c->db->blocking_keys,c->bstate.keys[j]); redisAssert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blocking_keys,c->blocking_keys[j]); - decrRefCount(c->blocking_keys[j]); + dictDelete(c->db->blocking_keys,c->bstate.keys[j]); + decrRefCount(c->bstate.keys[j]); } /* Cleanup the client structure */ - zfree(c->blocking_keys); - c->blocking_keys = NULL; + zfree(c->bstate.keys); + c->bstate.keys = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -777,7 +777,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - if (receiver->blocking_target == NULL) { + if (receiver->bstate.target == NULL) { addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,key); addReplyBulk(receiver,ele); @@ -785,7 +785,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { else { receiver->argc++; - robj *dobj = lookupKeyWrite(receiver->db,receiver->blocking_target); + robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; addReplyBulk(receiver,ele); @@ -793,7 +793,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { /* Create the list if the key does not exist */ if (!dobj) { dobj = createZiplistObject(); - dbAdd(receiver->db,receiver->blocking_target,dobj); + dbAdd(receiver->db,receiver->bstate.target,dobj); } listTypePush(dobj,ele,REDIS_HEAD); @@ -833,7 +833,7 @@ void blockingPopGenericCommand(redisClient *c, int where) { robj *argv[2], **orig_argv; int orig_argc; - if (c->blocking_target == NULL) { + if (c->bstate.target == NULL) { /* We need to alter the command arguments before to call * popGenericCommand() as the command takes a single key. */ orig_argv = c->argv; @@ -857,8 +857,8 @@ void blockingPopGenericCommand(redisClient *c, int where) { c->argc = orig_argc; } else { - c->argv[2] = c->blocking_target; - c->blocking_target = NULL; + c->argv[2] = c->bstate.target; + c->bstate.target = NULL; rpoplpushCommand(c); } @@ -891,7 +891,7 @@ void brpopCommand(redisClient *c) { } void brpoplpushCommand(redisClient *c) { - c->blocking_target = c->argv[2]; + c->bstate.target = c->argv[2]; c->argv[2] = c->argv[3]; c->argc--; -- 2.45.2