]> git.saurik.com Git - redis.git/commitdiff
Merge remote branch 'pietern/brpoplpush'
authorantirez <antirez@gmail.com>
Tue, 14 Dec 2010 15:26:37 +0000 (16:26 +0100)
committerantirez <antirez@gmail.com>
Tue, 14 Dec 2010 15:26:37 +0000 (16:26 +0100)
1  2 
src/networking.c
src/redis.h
src/t_list.c

diff --combined src/networking.c
index 90d157e15514016ae901a34f2474c8dc0c10b07f,7d14ac53f18bfc8c60ef457ff3b4c6383e3d59da..1dab8927586af844454abad521431a69437e7479
@@@ -41,8 -41,10 +41,10 @@@ redisClient *createClient(int fd) 
      c->reply = listCreate();
      listSetFreeMethod(c->reply,decrRefCount);
      listSetDupMethod(c->reply,dupClientReplyValue);
-     c->blocking_keys = NULL;
-     c->blocking_keys_num = 0;
+     c->bpop.keys = NULL;
+     c->bpop.count = 0;
+     c->bpop.timeout = 0;
+     c->bpop.target = NULL;
      c->io_keys = listCreate();
      c->watched_keys = listCreate();
      listSetFreeMethod(c->io_keys,decrRefCount);
@@@ -178,9 -180,6 +180,9 @@@ void addReply(redisClient *c, robj *obj
          if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
              _addReplyObjectToList(c,obj);
      } else {
 +        /* FIXME: convert the long into string and use _addReplyToBuffer()
 +         * instead of calling getDecodedObject. As this place in the
 +         * code is too performance critical. */
          obj = getDecodedObject(obj);
          if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
              _addReplyObjectToList(c,obj);
@@@ -278,7 -277,6 +280,7 @@@ void setDeferredMultiBulkLength(redisCl
      }
  }
  
 +/* Add a duble as a bulk reply */
  void addReplyDouble(redisClient *c, double d) {
      char dbuf[128], sbuf[128];
      int dlen, slen;
      addReplyString(c,sbuf,slen);
  }
  
 +/* Add a long long as integer reply or bulk len / multi bulk count.
 + * Basically this is used to output <prefix><long long><crlf>. */
  void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
      char buf[128];
      int len;
@@@ -307,7 -303,6 +309,7 @@@ void addReplyMultiBulkLen(redisClient *
      _addReplyLongLong(c,length,'*');
  }
  
 +/* Create the length prefix of a bulk reply, example: $2234 */
  void addReplyBulkLen(redisClient *c, robj *obj) {
      size_t len;
  
      _addReplyLongLong(c,len,'$');
  }
  
 +/* Add a Redis Object as a bulk reply */
  void addReplyBulk(redisClient *c, robj *obj) {
      addReplyBulkLen(c,obj);
      addReply(c,obj);
      addReply(c,shared.crlf);
  }
  
 -/* In the CONFIG command we need to add vanilla C string as bulk replies */
 +/* Add a C buffer as bulk reply */
 +void addReplyBulkCBuffer(redisClient *c, void *p, size_t len) {
 +    _addReplyLongLong(c,len,'$');
 +    addReplyString(c,p,len);
 +    addReply(c,shared.crlf);
 +}
 +
 +/* Add a C nul term string as bulk reply */
  void addReplyBulkCString(redisClient *c, char *s) {
      if (s == NULL) {
          addReply(c,shared.nullbulk);
      } else {
 -        robj *o = createStringObject(s,strlen(s));
 -        addReplyBulk(c,o);
 -        decrRefCount(o);
 +        addReplyBulkCBuffer(c,s,strlen(s));
      }
  }
  
 +/* Add a long long as a bulk reply */
 +void addReplyBulkLongLong(redisClient *c, long long ll) {
 +    char buf[64];
 +    int len;
 +
 +    len = ll2string(buf,64,ll);
 +    addReplyBulkCBuffer(c,buf,len);
 +}
 +
  static void acceptCommonHandler(int fd) {
      redisClient *c;
      if ((c = createClient(fd)) == NULL) {
@@@ -699,7 -679,7 +701,7 @@@ void closeTimedoutClients(void) 
              redisLog(REDIS_VERBOSE,"Closing idle client");
              freeClient(c);
          } else if (c->flags & REDIS_BLOCKED) {
-             if (c->blockingto != 0 && c->blockingto < now) {
+             if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
                  addReply(c,shared.nullmultibulk);
                  unblockClientWaitingData(c);
              }
diff --combined src/redis.h
index 67b76a47ba6fd303b68229301dd79d72a1a154dc,3639f0623d5d1cc3ad424837afe47ce416e8b4fc..860e77a18f40be8dcf718a4eb57b37397039ede8
@@@ -293,6 -293,16 +293,16 @@@ typedef struct multiState 
      int count;              /* Total number of MULTI commands */
  } multiState;
  
+ typedef struct blockingState {
+     robj **keys;            /* The key we are waiting to terminate a blocking
+                              * operation such as BLPOP. Otherwise NULL. */
+     int count;              /* Number of blocking keys */
+     time_t timeout;         /* Blocking operation timeout. If UNIX current time
+                              * is >= timeout then the operation timed out. */
+     robj *target;           /* The key that should receive the element,
+                              * for BRPOPLPUSH. */
+ } blockingState;
  /* With multiplexing we need to take per-clinet state.
   * Clients are taken in a liked list. */
  typedef struct redisClient {
      long repldboff;         /* replication DB file offset */
      off_t repldbsize;       /* replication DB file size */
      multiState mstate;      /* MULTI/EXEC state */
-     robj **blocking_keys;   /* The key we are waiting to terminate a blocking
-                              * operation such as BLPOP. Otherwise NULL. */
-     int blocking_keys_num;  /* Number of blocking keys */
-     time_t blockingto;      /* Blocking operation timeout. If UNIX current time
-                              * is >= blockingto then the operation timed out. */
+     blockingState bpop;   /* blocking state */
      list *io_keys;          /* Keys this client is waiting to be loaded from the
                               * swap file in order to continue. */
      list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
@@@ -427,8 -433,9 +433,9 @@@ struct redisServer 
      int maxmemory_policy;
      int maxmemory_samples;
      /* Blocked clients */
-     unsigned int blpop_blocked_clients;
+     unsigned int bpop_blocked_clients;
      unsigned int vm_blocked_clients;
+     list *unblocked_clients;
      /* Sort parameters - qsort_r() is only available under BSD so we
       * have to take this state global, in order to pass it to sortCompare() */
      int sort_desc;
@@@ -639,8 -646,6 +646,8 @@@ void acceptUnixHandler(aeEventLoop *el
  void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
  void addReplyBulk(redisClient *c, robj *obj);
  void addReplyBulkCString(redisClient *c, char *s);
 +void addReplyBulkCBuffer(redisClient *c, void *p, size_t len);
 +void addReplyBulkLongLong(redisClient *c, long long ll);
  void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
  void addReply(redisClient *c, robj *obj);
  void addReplySds(redisClient *c, sds s);
@@@ -813,9 -818,8 +820,9 @@@ int setTypeRemove(robj *subject, robj *
  int setTypeIsMember(robj *subject, robj *value);
  setTypeIterator *setTypeInitIterator(robj *subject);
  void setTypeReleaseIterator(setTypeIterator *si);
 -robj *setTypeNext(setTypeIterator *si);
 -robj *setTypeRandomElement(robj *subject);
 +int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele);
 +robj *setTypeNextObject(setTypeIterator *si);
 +int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele);
  unsigned long setTypeSize(robj *subject);
  void setTypeConvert(robj *subject, int enc);
  
  void convertToRealHash(robj *o);
  void hashTypeTryConversion(robj *subject, robj **argv, int start, int end);
  void hashTypeTryObjectEncoding(robj *subject, robj **o1, robj **o2);
 -robj *hashTypeGet(robj *o, robj *key);
 +int hashTypeGet(robj *o, robj *key, robj **objval, unsigned char **v, unsigned int *vlen);
 +robj *hashTypeGetObject(robj *o, robj *key);
  int hashTypeExists(robj *o, robj *key);
  int hashTypeSet(robj *o, robj *key, robj *value);
  int hashTypeDelete(robj *o, robj *key);
@@@ -832,8 -835,7 +839,8 @@@ unsigned long hashTypeLength(robj *o)
  hashTypeIterator *hashTypeInitIterator(robj *subject);
  void hashTypeReleaseIterator(hashTypeIterator *hi);
  int hashTypeNext(hashTypeIterator *hi);
 -robj *hashTypeCurrent(hashTypeIterator *hi, int what);
 +int hashTypeCurrent(hashTypeIterator *hi, int what, robj **objval, unsigned char **v, unsigned int *vlen);
 +robj *hashTypeCurrentObject(hashTypeIterator *hi, int what);
  robj *hashTypeLookupWriteOrCreate(redisClient *c, robj *key);
  
  /* Pub / Sub */
@@@ -937,7 -939,7 +944,7 @@@ void flushdbCommand(redisClient *c)
  void flushallCommand(redisClient *c);
  void sortCommand(redisClient *c);
  void lremCommand(redisClient *c);
- void rpoplpushcommand(redisClient *c);
+ void rpoplpushCommand(redisClient *c);
  void infoCommand(redisClient *c);
  void mgetCommand(redisClient *c);
  void monitorCommand(redisClient *c);
@@@ -966,6 -968,7 +973,7 @@@ void execCommand(redisClient *c)
  void discardCommand(redisClient *c);
  void blpopCommand(redisClient *c);
  void brpopCommand(redisClient *c);
+ void brpoplpushCommand(redisClient *c);
  void appendCommand(redisClient *c);
  void substrCommand(redisClient *c);
  void strlenCommand(redisClient *c);
diff --combined src/t_list.c
index d1ec3db9e343e0202411f2e21afb051e1d8a4df4,7dc3f13934aa612e67abaad94c9a0671c87b11b0..8ee3b987215a059a73c9900c152383c7988a90a3
@@@ -472,11 -472,12 +472,11 @@@ void rpopCommand(redisClient *c) 
  }
  
  void lrangeCommand(redisClient *c) {
 -    robj *o, *value;
 +    robj *o;
      int start = atoi(c->argv[2]->ptr);
      int end = atoi(c->argv[3]->ptr);
      int llen;
 -    int rangelen, j;
 -    listTypeEntry entry;
 +    int rangelen;
  
      if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
           || checkType(c,o,REDIS_LIST)) return;
  
      /* Return the result in form of a multi-bulk reply */
      addReplyMultiBulkLen(c,rangelen);
 -    listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL);
 -    for (j = 0; j < rangelen; j++) {
 -        redisAssert(listTypeNext(li,&entry));
 -        value = listTypeGet(&entry);
 -        addReplyBulk(c,value);
 -        decrRefCount(value);
 +    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
 +        unsigned char *p = ziplistIndex(o->ptr,start);
 +        unsigned char *vstr;
 +        unsigned int vlen;
 +        long long vlong;
 +
 +        while(rangelen--) {
 +            ziplistGet(p,&vstr,&vlen,&vlong);
 +            if (vstr) {
 +                addReplyBulkCBuffer(c,vstr,vlen);
 +            } else {
 +                addReplyBulkLongLong(c,vlong);
 +            }
 +            p = ziplistNext(o->ptr,p);
 +        }
 +    } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
 +        listNode *ln = listIndex(o->ptr,start);
 +
 +        while(rangelen--) {
 +            addReplyBulk(c,ln->value);
 +            ln = ln->next;
 +        }
 +    } else {
 +        redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
      }
 -    listTypeReleaseIterator(li);
  }
  
  void ltrimCommand(redisClient *c) {
@@@ -621,20 -605,38 +621,38 @@@ void lremCommand(redisClient *c) 
  
  /* This is the semantic of this command:
   *  RPOPLPUSH srclist dstlist:
-  *   IF LLEN(srclist) > 0
-  *     element = RPOP srclist
-  *     LPUSH dstlist element
-  *     RETURN element
-  *   ELSE
-  *     RETURN nil
-  *   END
+  *    IF LLEN(srclist) > 0
+  *      element = RPOP srclist
+  *      LPUSH dstlist element
+  *      RETURN element
+  *    ELSE
+  *      RETURN nil
+  *    END
   *  END
   *
   * The idea is to be able to get an element from a list in a reliable way
   * since the element is not just returned but pushed against another list
   * as well. This command was originally proposed by Ezra Zygmuntowicz.
   */
- void rpoplpushcommand(redisClient *c) {
+ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+     if (!handleClientsWaitingListPush(c,dstkey,value)) {
+         /* Create the list if the key does not exist */
+         if (!dstobj) {
+             dstobj = createZiplistObject();
+             dbAdd(c->db,dstkey,dstobj);
+         } else {
+             touchWatchedKey(c->db,dstkey);
+             server.dirty++;
+         }
+         listTypePush(dstobj,value,REDIS_HEAD);
+     }
+     /* Always send the pushed value to the client. */
+     addReplyBulk(c,value);
+ }
+ void rpoplpushCommand(redisClient *c) {
      robj *sobj, *value;
      if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
          checkType(c,sobj,REDIS_LIST)) return;
          robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
          if (dobj && checkType(c,dobj,REDIS_LIST)) return;
          value = listTypePop(sobj,REDIS_TAIL);
-         /* Add the element to the target list (unless it's directly
-          * passed to some BLPOP-ing client */
-         if (!handleClientsWaitingListPush(c,c->argv[2],value)) {
-             /* Create the list if the key does not exist */
-             if (!dobj) {
-                 dobj = createZiplistObject();
-                 dbAdd(c->db,c->argv[2],dobj);
-             }
-             listTypePush(dobj,value,REDIS_HEAD);
-         }
-         /* Send the element to the client as reply as well */
-         addReplyBulk(c,value);
+         rpoplpushHandlePush(c,c->argv[2],dobj,value);
  
          /* listTypePop returns an object with its refcount incremented */
          decrRefCount(value);
  
  /* Set a client in blocking mode for the specified key, with the specified
   * timeout */
- void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
+ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
      dictEntry *de;
      list *l;
      int j;
  
-     c->blocking_keys = zmalloc(sizeof(robj*)*numkeys);
-     c->blocking_keys_num = numkeys;
-     c->blockingto = timeout;
+     c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
+     c->bpop.count = numkeys;
+     c->bpop.timeout = timeout;
+     c->bpop.target = target;
+     if (target != NULL) {
+         incrRefCount(target);
+     }
      for (j = 0; j < numkeys; j++) {
          /* Add the key in the client structure, to map clients -> keys */
-         c->blocking_keys[j] = keys[j];
+         c->bpop.keys[j] = keys[j];
          incrRefCount(keys[j]);
  
          /* And in the other "side", to map keys -> clients */
      }
      /* Mark the client as a blocked client */
      c->flags |= REDIS_BLOCKED;
-     server.blpop_blocked_clients++;
+     server.bpop_blocked_clients++;
  }
  
  /* Unblock a client that's waiting in a blocking operation such as BLPOP */
@@@ -744,30 -739,27 +755,27 @@@ void unblockClientWaitingData(redisClie
      list *l;
      int j;
  
-     redisAssert(c->blocking_keys != NULL);
+     redisAssert(c->bpop.keys != NULL);
      /* The client may wait for multiple keys, so unblock it for every key. */
-     for (j = 0; j < c->blocking_keys_num; j++) {
+     for (j = 0; j < c->bpop.count; j++) {
          /* Remove this client from the list of clients waiting for this key. */
-         de = dictFind(c->db->blocking_keys,c->blocking_keys[j]);
+         de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
          redisAssert(de != NULL);
          l = dictGetEntryVal(de);
          listDelNode(l,listSearchKey(l,c));
          /* If the list is empty we need to remove it to avoid wasting memory */
          if (listLength(l) == 0)
-             dictDelete(c->db->blocking_keys,c->blocking_keys[j]);
-         decrRefCount(c->blocking_keys[j]);
+             dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
+         decrRefCount(c->bpop.keys[j]);
      }
      /* Cleanup the client structure */
-     zfree(c->blocking_keys);
-     c->blocking_keys = NULL;
+     zfree(c->bpop.keys);
+     c->bpop.keys = NULL;
+     c->bpop.target = NULL;
      c->flags &= (~REDIS_BLOCKED);
-     server.blpop_blocked_clients--;
-     /* We want to process data if there is some command waiting
-      * in the input buffer. Note that this is safe even if
-      * unblockClientWaitingData() gets called from freeClient() because
-      * freeClient() will be smart enough to call this function
-      * *after* c->querybuf was set to NULL. */
-     if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
+     server.bpop_blocked_clients--;
+     listAddNodeTail(server.unblocked_clients,c);
  }
  
  /* This should be called from any function PUSHing into lists.
  int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
      struct dictEntry *de;
      redisClient *receiver;
-     list *l;
+     int numclients;
+     list *clients;
      listNode *ln;
+     robj *dstkey, *dstobj;
  
      de = dictFind(c->db->blocking_keys,key);
      if (de == NULL) return 0;
-     l = dictGetEntryVal(de);
-     ln = listFirst(l);
-     redisAssert(ln != NULL);
-     receiver = ln->value;
+     clients = dictGetEntryVal(de);
+     numclients = listLength(clients);
+     /* Try to handle the push as long as there are clients waiting for a push.
+      * Note that "numclients" is used because the list of clients waiting for a
+      * push on "key" is deleted by unblockClient() when empty.
+      *
+      * This loop will have more than 1 iteration when there is a BRPOPLPUSH
+      * that cannot push the target list because it does not contain a list. If
+      * this happens, it simply tries the next client waiting for a push. */
+     while (numclients--) {
+         ln = listFirst(clients);
+         redisAssert(ln != NULL);
+         receiver = ln->value;
+         dstkey = receiver->bpop.target;
+         /* This should remove the first element of the "clients" list. */
+         unblockClientWaitingData(receiver);
+         redisAssert(ln != listFirst(clients));
+         if (dstkey == NULL) {
+             /* BRPOP/BLPOP */
+             addReplyMultiBulkLen(receiver,2);
+             addReplyBulk(receiver,key);
+             addReplyBulk(receiver,ele);
+             return 1;
+         } else {
+             /* BRPOPLPUSH */
+             dstobj = lookupKeyWrite(receiver->db,dstkey);
+             if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
+                 decrRefCount(dstkey);
+             } else {
+                 rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
+                 decrRefCount(dstkey);
+                 return 1;
+             }
+         }
+     }
  
-     addReplyMultiBulkLen(receiver,2);
-     addReplyBulk(receiver,key);
-     addReplyBulk(receiver,ele);
-     unblockClientWaitingData(receiver);
-     return 1;
+     return 0;
+ }
+ int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
+     long tval;
+     if (getLongFromObjectOrReply(c,object,&tval,
+         "timeout is not an integer or out of range") != REDIS_OK)
+         return REDIS_ERR;
+     if (tval < 0) {
+         addReplyError(c,"timeout is negative");
+         return REDIS_ERR;
+     }
+     if (tval > 0) tval += time(NULL);
+     *timeout = tval;
+     return REDIS_OK;
  }
  
  /* Blocking RPOP/LPOP */
  void blockingPopGenericCommand(redisClient *c, int where) {
      robj *o;
-     long long lltimeout;
      time_t timeout;
      int j;
  
-     /* Make sure timeout is an integer value */
-     if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout,
-             "timeout is not an integer") != REDIS_OK) return;
-     /* Make sure the timeout is not negative */
-     if (lltimeout < 0) {
-         addReplyError(c,"timeout is negative");
+     if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
          return;
-     }
  
      for (j = 1; j < c->argc-1; j++) {
          o = lookupKeyWrite(c->db,c->argv[j]);
                       * because it is... */
                      addReplyMultiBulkLen(c,2);
                      addReplyBulk(c,argv[1]);
                      popGenericCommand(c,where);
  
                      /* Fix the client structure with the original stuff */
                      c->argv = orig_argv;
                      c->argc = orig_argc;
                      return;
                  }
              }
      }
  
      /* If the list is empty or the key does not exists we must block */
-     timeout = lltimeout;
-     if (timeout > 0) timeout += time(NULL);
-     blockForKeys(c,c->argv+1,c->argc-2,timeout);
+     blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
  }
  
  void blpopCommand(redisClient *c) {
  void brpopCommand(redisClient *c) {
      blockingPopGenericCommand(c,REDIS_TAIL);
  }
+ void brpoplpushCommand(redisClient *c) {
+     time_t timeout;
+     if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
+         return;
+     robj *key = lookupKeyWrite(c->db, c->argv[1]);
+     if (key == NULL) {
+         if (c->flags & REDIS_MULTI) {
+             /* Blocking against an empty list in a multi state
+              * returns immediately. */
+             addReply(c, shared.nullmultibulk);
+         } else {
+             /* The list is empty and the client blocks. */
+             blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
+         }
+     } else {
+         if (key->type != REDIS_LIST) {
+             addReply(c, shared.wrongtypeerr);
+         } else {
+             /* The list exists and has elements, so
+              * the regular rpoplpushCommand is executed. */
+             redisAssert(listTypeLength(key) > 0);
+             rpoplpushCommand(c);
+         }
+     }
+ }