]> git.saurik.com Git - redis.git/blobdiff - src/t_list.c
Refactor code for BRPOPLPUSH.
[redis.git] / src / t_list.c
index add1bee167691b1fcc382c1292c728832f6c65bd..2dedf4811a4e619dc8d896126903c2741f186127 100644 (file)
@@ -260,6 +260,7 @@ void listTypeConvert(robj *subject, int enc) {
 
 void pushGenericCommand(redisClient *c, int where) {
     robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
+    c->argv[2] = tryObjectEncoding(c->argv[2]);
     if (lobj == NULL) {
         if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
             addReply(c,shared.cone);
@@ -342,18 +343,21 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
         server.dirty++;
     }
 
-    addReplyUlong(c,listTypeLength(subject));
+    addReplyLongLong(c,listTypeLength(subject));
 }
 
 void lpushxCommand(redisClient *c) {
+    c->argv[2] = tryObjectEncoding(c->argv[2]);
     pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
 }
 
 void rpushxCommand(redisClient *c) {
+    c->argv[2] = tryObjectEncoding(c->argv[2]);
     pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
 }
 
 void linsertCommand(redisClient *c) {
+    c->argv[4] = tryObjectEncoding(c->argv[4]);
     if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
         pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
     } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
@@ -366,7 +370,7 @@ void linsertCommand(redisClient *c) {
 void llenCommand(redisClient *c) {
     robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
     if (o == NULL || checkType(c,o,REDIS_LIST)) return;
-    addReplyUlong(c,listTypeLength(o));
+    addReplyLongLong(c,listTypeLength(o));
 }
 
 void lindexCommand(redisClient *c) {
@@ -409,7 +413,7 @@ void lsetCommand(redisClient *c) {
     robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
     if (o == NULL || checkType(c,o,REDIS_LIST)) return;
     int index = atoi(c->argv[2]->ptr);
-    robj *value = c->argv[3];
+    robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
 
     listTypeTryConversion(o,value);
     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
@@ -494,7 +498,7 @@ void lrangeCommand(redisClient *c) {
     rangelen = (end-start)+1;
 
     /* Return the result in form of a multi-bulk reply */
-    addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
+    addReplyMultiBulkLen(c,rangelen);
     listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL);
     for (j = 0; j < rangelen; j++) {
         redisAssert(listTypeNext(li,&entry));
@@ -559,7 +563,8 @@ void ltrimCommand(redisClient *c) {
 }
 
 void lremCommand(redisClient *c) {
-    robj *subject, *obj = c->argv[3];
+    robj *subject, *obj;
+    obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
     int toremove = atoi(c->argv[2]->ptr);
     int removed = 0;
     listTypeEntry entry;
@@ -594,7 +599,7 @@ void lremCommand(redisClient *c) {
         decrRefCount(obj);
 
     if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
-    addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
+    addReplyLongLong(c,removed);
     if (removed) touchWatchedKey(c->db,c->argv[1]);
 }
 
@@ -613,7 +618,7 @@ void lremCommand(redisClient *c) {
  * since the element is not just returned but pushed against another list
  * as well. This command was originally proposed by Ezra Zygmuntowicz.
  */
-void rpoplpushcommand(redisClient *c) {
+void rpoplpushCommand(redisClient *c) {
     robj *sobj, *value;
     if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,sobj,REDIS_LIST)) return;
@@ -684,17 +689,23 @@ void rpoplpushcommand(redisClient *c) {
 
 /* Set a client in blocking mode for the specified key, with the specified
  * timeout */
-void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
+void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
     dictEntry *de;
     list *l;
     int j;
 
-    c->blocking_keys = zmalloc(sizeof(robj*)*numkeys);
-    c->blocking_keys_num = numkeys;
-    c->blockingto = timeout;
+    c->bstate.keys = zmalloc(sizeof(robj*)*numkeys);
+    c->bstate.count = numkeys;
+    c->bstate.timeout = timeout;
+    c->bstate.target = target;
+
+    if (target != NULL) {
+      incrRefCount(target);
+    }
+
     for (j = 0; j < numkeys; j++) {
         /* Add the key in the client structure, to map clients -> keys */
-        c->blocking_keys[j] = keys[j];
+        c->bstate.keys[j] = keys[j];
         incrRefCount(keys[j]);
 
         /* And in the other "side", to map keys -> clients */
@@ -723,22 +734,28 @@ void unblockClientWaitingData(redisClient *c) {
     list *l;
     int j;
 
-    redisAssert(c->blocking_keys != NULL);
+    redisAssert(c->bstate.keys != NULL);
     /* The client may wait for multiple keys, so unblock it for every key. */
-    for (j = 0; j < c->blocking_keys_num; j++) {
+    for (j = 0; j < c->bstate.count; j++) {
         /* Remove this client from the list of clients waiting for this key. */
-        de = dictFind(c->db->blocking_keys,c->blocking_keys[j]);
+        de = dictFind(c->db->blocking_keys,c->bstate.keys[j]);
         redisAssert(de != NULL);
         l = dictGetEntryVal(de);
         listDelNode(l,listSearchKey(l,c));
         /* If the list is empty we need to remove it to avoid wasting memory */
         if (listLength(l) == 0)
-            dictDelete(c->db->blocking_keys,c->blocking_keys[j]);
-        decrRefCount(c->blocking_keys[j]);
+            dictDelete(c->db->blocking_keys,c->bstate.keys[j]);
+        decrRefCount(c->bstate.keys[j]);
+    }
+
+    if (c->bstate.target != NULL) {
+        decrRefCount(c->bstate.target);
     }
+
     /* Cleanup the client structure */
-    zfree(c->blocking_keys);
-    c->blocking_keys = NULL;
+    zfree(c->bstate.keys);
+    c->bstate.keys = NULL;
+    c->bstate.target = NULL;
     c->flags &= (~REDIS_BLOCKED);
     server.blpop_blocked_clients--;
     /* We want to process data if there is some command waiting
@@ -772,9 +789,26 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
     redisAssert(ln != NULL);
     receiver = ln->value;
 
-    addReplySds(receiver,sdsnew("*2\r\n"));
-    addReplyBulk(receiver,key);
-    addReplyBulk(receiver,ele);
+    if (receiver->bstate.target == NULL) {
+      addReplyMultiBulkLen(receiver,2);
+      addReplyBulk(receiver,key);
+      addReplyBulk(receiver,ele);
+    }
+    else {
+      robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target);
+      if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
+
+      addReplyBulk(receiver,ele);
+
+      /* Create the list if the key does not exist */
+      if (!dobj) {
+          dobj = createZiplistObject();
+          dbAdd(receiver->db,receiver->bstate.target,dobj);
+      }
+
+      listTypePush(dobj,ele,REDIS_HEAD);
+    }
+
     unblockClientWaitingData(receiver);
     return 1;
 }
@@ -782,17 +816,10 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
 /* Blocking RPOP/LPOP */
 void blockingPopGenericCommand(redisClient *c, int where) {
     robj *o;
-    long long lltimeout;
     time_t timeout;
     int j;
 
-    /* Make sure timeout is an integer value */
-    if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout,
-            "timeout is not an integer") != REDIS_OK) return;
-
-    /* Make sure the timeout is not negative */
-    if (lltimeout < 0) {
-        addReplySds(c,sdsnew("-ERR timeout is negative\r\n"));
+    if (checkTimeout(c, c->argv[c->argc - 1], &timeout) != REDIS_OK) {
         return;
     }
 
@@ -822,13 +849,15 @@ void blockingPopGenericCommand(redisClient *c, int where) {
                      * "real" command will add the last element (the value)
                      * for us. If this souds like an hack to you it's just
                      * because it is... */
-                    addReplySds(c,sdsnew("*2\r\n"));
+                    addReplyMultiBulkLen(c,2);
                     addReplyBulk(c,argv[1]);
+
                     popGenericCommand(c,where);
 
                     /* Fix the client structure with the original stuff */
                     c->argv = orig_argv;
                     c->argc = orig_argc;
+
                     return;
                 }
             }
@@ -843,9 +872,26 @@ void blockingPopGenericCommand(redisClient *c, int where) {
     }
 
     /* If the list is empty or the key does not exists we must block */
-    timeout = lltimeout;
     if (timeout > 0) timeout += time(NULL);
-    blockForKeys(c,c->argv+1,c->argc-2,timeout);
+    blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
+}
+
+int checkTimeout(redisClient *c, robj *object, time_t *timeout) {
+    long long lltimeout;
+
+    if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) {
+      addReplyError(c, "timeout is not an integer");
+      return REDIS_ERR;
+    }
+    
+    if (lltimeout < 0) {
+      addReplyError(c, "timeout is negative");
+      return REDIS_ERR;
+    }
+
+    *timeout = lltimeout;
+
+    return REDIS_OK;
 }
 
 void blpopCommand(redisClient *c) {
@@ -855,3 +901,30 @@ void blpopCommand(redisClient *c) {
 void brpopCommand(redisClient *c) {
     blockingPopGenericCommand(c,REDIS_TAIL);
 }
+
+void brpoplpushCommand(redisClient *c) {
+    time_t timeout;
+
+    if (checkTimeout(c, c->argv[3], &timeout) != REDIS_OK) {
+        return;
+    }
+
+    robj *key = lookupKeyWrite(c->db, c->argv[1]);
+
+
+    if (key == NULL) {
+        // block
+        if (c->flags & REDIS_MULTI) {
+            addReply(c,shared.nullmultibulk);
+        } else {
+            if (timeout > 0) timeout += time(NULL);
+            blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
+        }
+    } else if (key->type != REDIS_LIST) {
+        addReply(c, shared.wrongtypeerr);
+    } else {
+        // The list exists and has elements.
+        redisAssert(listTypeLength(key) > 0);
+        rpoplpushCommand(c);
+    }
+}