]> git.saurik.com Git - redis.git/commitdiff
Merge branch 'master' of github.com:antirez/redis
authorantirez <antirez@gmail.com>
Tue, 14 Dec 2010 16:42:46 +0000 (17:42 +0100)
committerantirez <antirez@gmail.com>
Tue, 14 Dec 2010 16:42:46 +0000 (17:42 +0100)
1  2 
src/redis.c
src/redis.h

diff --combined src/redis.c
index 035ccea8c7cc7d0e48f3996453db3f5ce6ae6714,a1653c36e458ae8683400650384031283a2ed194..208a3332f591f5b82a95420dc67cb838ff06e3ae
@@@ -74,14 -74,10 +74,14 @@@ struct redisCommand readonlyCommandTabl
      {"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0},
      {"setex",setexCommand,4,REDIS_CMD_DENYOOM,NULL,0,0,0},
      {"append",appendCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1},
 -    {"substr",substrCommand,4,0,NULL,1,1,1},
      {"strlen",strlenCommand,2,0,NULL,1,1,1},
      {"del",delCommand,-2,0,NULL,0,0,0},
      {"exists",existsCommand,2,0,NULL,1,1,1},
 +    {"setbit",setbitCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1},
 +    {"getbit",getbitCommand,3,0,NULL,1,1,1},
 +    {"setrange",setrangeCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1},
 +    {"getrange",getrangeCommand,4,0,NULL,1,1,1},
 +    {"substr",getrangeCommand,4,0,NULL,1,1,1},
      {"incr",incrCommand,2,REDIS_CMD_DENYOOM,NULL,1,1,1},
      {"decr",decrCommand,2,REDIS_CMD_DENYOOM,NULL,1,1,1},
      {"mget",mgetCommand,-2,0,NULL,1,-1,1},
@@@ -93,6 -89,7 +93,7 @@@
      {"rpop",rpopCommand,2,0,NULL,1,1,1},
      {"lpop",lpopCommand,2,0,NULL,1,1,1},
      {"brpop",brpopCommand,-3,0,NULL,1,1,1},
+     {"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1},
      {"blpop",blpopCommand,-3,0,NULL,1,1,1},
      {"llen",llenCommand,2,0,NULL,1,1,1},
      {"lindex",lindexCommand,3,0,NULL,1,1,1},
      {"lrange",lrangeCommand,4,0,NULL,1,1,1},
      {"ltrim",ltrimCommand,4,0,NULL,1,1,1},
      {"lrem",lremCommand,4,0,NULL,1,1,1},
-     {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1},
+     {"rpoplpush",rpoplpushCommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1},
      {"sadd",saddCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1},
      {"srem",sremCommand,3,0,NULL,1,1,1},
      {"smove",smoveCommand,4,0,NULL,1,2,1},
@@@ -576,7 -573,7 +577,7 @@@ int serverCron(struct aeEventLoop *even
      }
  
      /* Close connections of timedout clients */
-     if ((server.maxidletime && !(loops % 100)) || server.blpop_blocked_clients)
+     if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
          closeTimedoutClients();
  
      /* Check if a background saving or AOF rewrite in progress terminated */
   * for ready file descriptors. */
  void beforeSleep(struct aeEventLoop *eventLoop) {
      REDIS_NOTUSED(eventLoop);
+     listNode *ln;
+     redisClient *c;
  
      /* Awake clients that got all the swapped keys they requested */
      if (server.vm_enabled && listLength(server.io_ready_clients)) {
          listIter li;
-         listNode *ln;
  
          listRewind(server.io_ready_clients,&li);
          while((ln = listNext(&li))) {
-             redisClient *c = ln->value;
+             c = ln->value;
              struct redisCommand *cmd;
  
              /* Resume the client. */
                  processInputBuffer(c);
          }
      }
+     /* Try to process pending commands for clients that were just unblocked. */
+     while (listLength(server.unblocked_clients)) {
+         ln = listFirst(server.unblocked_clients);
+         redisAssert(ln != NULL);
+         c = ln->value;
+         listDelNode(server.unblocked_clients,ln);
+         /* Process remaining data in the input buffer. */
+         if (c->querybuf && sdslen(c->querybuf) > 0)
+             processInputBuffer(c);
+     }
      /* Write the AOF buffer on disk */
      flushAppendOnlyFile();
  }
@@@ -762,7 -773,7 +777,7 @@@ void initServerConfig() 
      server.rdbcompression = 1;
      server.activerehashing = 1;
      server.maxclients = 0;
-     server.blpop_blocked_clients = 0;
+     server.bpop_blocked_clients = 0;
      server.maxmemory = 0;
      server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
      server.maxmemory_samples = 3;
@@@ -821,6 -832,7 +836,7 @@@ void initServer() 
      server.clients = listCreate();
      server.slaves = listCreate();
      server.monitors = listCreate();
+     server.unblocked_clients = listCreate();
      createSharedObjects();
      server.el = aeCreateEventLoop();
      server.db = zmalloc(sizeof(redisDb)*server.dbnum);
@@@ -1173,7 -1185,7 +1189,7 @@@ sds genRedisInfoString(void) 
          (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
          listLength(server.clients)-listLength(server.slaves),
          listLength(server.slaves),
-         server.blpop_blocked_clients,
+         server.bpop_blocked_clients,
          zmalloc_used_memory(),
          hmem,
          zmalloc_get_rss(),
diff --combined src/redis.h
index 6e2112a0d51065ca4c51e7e806e3f1e66884ef87,860e77a18f40be8dcf718a4eb57b37397039ede8..4a6af106b92b3a4168b2a09062a1bcc0c09d35de
@@@ -293,6 -293,16 +293,16 @@@ typedef struct multiState 
      int count;              /* Total number of MULTI commands */
  } multiState;
  
+ typedef struct blockingState {
+     robj **keys;            /* The key we are waiting to terminate a blocking
+                              * operation such as BLPOP. Otherwise NULL. */
+     int count;              /* Number of blocking keys */
+     time_t timeout;         /* Blocking operation timeout. If UNIX current time
+                              * is >= timeout then the operation timed out. */
+     robj *target;           /* The key that should receive the element,
+                              * for BRPOPLPUSH. */
+ } blockingState;
  /* With multiplexing we need to take per-clinet state.
   * Clients are taken in a liked list. */
  typedef struct redisClient {
      long repldboff;         /* replication DB file offset */
      off_t repldbsize;       /* replication DB file size */
      multiState mstate;      /* MULTI/EXEC state */
-     robj **blocking_keys;   /* The key we are waiting to terminate a blocking
-                              * operation such as BLPOP. Otherwise NULL. */
-     int blocking_keys_num;  /* Number of blocking keys */
-     time_t blockingto;      /* Blocking operation timeout. If UNIX current time
-                              * is >= blockingto then the operation timed out. */
+     blockingState bpop;   /* blocking state */
      list *io_keys;          /* Keys this client is waiting to be loaded from the
                               * swap file in order to continue. */
      list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
@@@ -427,8 -433,9 +433,9 @@@ struct redisServer 
      int maxmemory_policy;
      int maxmemory_samples;
      /* Blocked clients */
-     unsigned int blpop_blocked_clients;
+     unsigned int bpop_blocked_clients;
      unsigned int vm_blocked_clients;
+     list *unblocked_clients;
      /* Sort parameters - qsort_r() is only available under BSD so we
       * have to take this state global, in order to pass it to sortCompare() */
      int sort_desc;
@@@ -890,10 -897,6 +897,10 @@@ void setexCommand(redisClient *c)
  void getCommand(redisClient *c);
  void delCommand(redisClient *c);
  void existsCommand(redisClient *c);
 +void setbitCommand(redisClient *c);
 +void getbitCommand(redisClient *c);
 +void setrangeCommand(redisClient *c);
 +void getrangeCommand(redisClient *c);
  void incrCommand(redisClient *c);
  void decrCommand(redisClient *c);
  void incrbyCommand(redisClient *c);
@@@ -941,7 -944,7 +948,7 @@@ void flushdbCommand(redisClient *c)
  void flushallCommand(redisClient *c);
  void sortCommand(redisClient *c);
  void lremCommand(redisClient *c);
- void rpoplpushcommand(redisClient *c);
+ void rpoplpushCommand(redisClient *c);
  void infoCommand(redisClient *c);
  void mgetCommand(redisClient *c);
  void monitorCommand(redisClient *c);
@@@ -970,7 -973,9 +977,8 @@@ void execCommand(redisClient *c)
  void discardCommand(redisClient *c);
  void blpopCommand(redisClient *c);
  void brpopCommand(redisClient *c);
+ void brpoplpushCommand(redisClient *c);
  void appendCommand(redisClient *c);
 -void substrCommand(redisClient *c);
  void strlenCommand(redisClient *c);
  void zrankCommand(redisClient *c);
  void zrevrankCommand(redisClient *c);