]> git.saurik.com Git - redis.git/blobdiff - src/t_list.c
Variadic ZADD
[redis.git] / src / t_list.c
index 867e258a1e22ef6add9ca45925e381aebe55c75c..7c1b848a814b3a9cc08ceb61e615d3eb5e167744 100644 (file)
@@ -259,30 +259,35 @@ void listTypeConvert(robj *subject, int enc) {
  *----------------------------------------------------------------------------*/
 
 void pushGenericCommand(redisClient *c, int where) {
  *----------------------------------------------------------------------------*/
 
 void pushGenericCommand(redisClient *c, int where) {
+    int j, addlen = 0, pushed = 0;
     robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
     robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
-    c->argv[2] = tryObjectEncoding(c->argv[2]);
-    if (lobj == NULL) {
-        if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
-            addReply(c,shared.cone);
-            return;
-        }
-        lobj = createZiplistObject();
-        dbAdd(c->db,c->argv[1],lobj);
-    } else {
-        if (lobj->type != REDIS_LIST) {
-            addReply(c,shared.wrongtypeerr);
-            return;
+    int may_have_waiting_clients = (lobj == NULL);
+
+    if (lobj && lobj->type != REDIS_LIST) {
+        addReply(c,shared.wrongtypeerr);
+        return;
+    }
+
+    for (j = 2; j < c->argc; j++) {
+        c->argv[j] = tryObjectEncoding(c->argv[j]);
+        if (may_have_waiting_clients) {
+            if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
+                addlen++;
+                continue;
+            } else {
+                may_have_waiting_clients = 0;
+            }
         }
         }
-        if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
-            touchWatchedKey(c->db,c->argv[1]);
-            addReply(c,shared.cone);
-            return;
+        if (!lobj) {
+            lobj = createZiplistObject();
+            dbAdd(c->db,c->argv[1],lobj);
         }
         }
+        listTypePush(lobj,c->argv[j],where);
+        pushed++;
     }
     }
-    listTypePush(lobj,c->argv[2],where);
-    addReplyLongLong(c,listTypeLength(lobj));
-    touchWatchedKey(c->db,c->argv[1]);
-    server.dirty++;
+    addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0));
+    if (pushed) signalModifiedKey(c->db,c->argv[1]);
+    server.dirty += pushed;
 }
 
 void lpushCommand(redisClient *c) {
 }
 
 void lpushCommand(redisClient *c) {
@@ -330,7 +335,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
             if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
                 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
                     listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
             if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
                 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
                     listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
-            touchWatchedKey(c->db,c->argv[1]);
+            signalModifiedKey(c->db,c->argv[1]);
             server.dirty++;
         } else {
             /* Notify client of a failed insert */
             server.dirty++;
         } else {
             /* Notify client of a failed insert */
@@ -339,7 +344,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
         }
     } else {
         listTypePush(subject,val,where);
         }
     } else {
         listTypePush(subject,val,where);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     }
 
         server.dirty++;
     }
 
@@ -427,7 +432,7 @@ void lsetCommand(redisClient *c) {
             o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
             decrRefCount(value);
             addReply(c,shared.ok);
             o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
             decrRefCount(value);
             addReply(c,shared.ok);
-            touchWatchedKey(c->db,c->argv[1]);
+            signalModifiedKey(c->db,c->argv[1]);
             server.dirty++;
         }
     } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
             server.dirty++;
         }
     } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
@@ -439,7 +444,7 @@ void lsetCommand(redisClient *c) {
             listNodeValue(ln) = value;
             incrRefCount(value);
             addReply(c,shared.ok);
             listNodeValue(ln) = value;
             incrRefCount(value);
             addReply(c,shared.ok);
-            touchWatchedKey(c->db,c->argv[1]);
+            signalModifiedKey(c->db,c->argv[1]);
             server.dirty++;
         }
     } else {
             server.dirty++;
         }
     } else {
@@ -458,7 +463,7 @@ void popGenericCommand(redisClient *c, int where) {
         addReplyBulk(c,value);
         decrRefCount(value);
         if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
         addReplyBulk(c,value);
         decrRefCount(value);
         if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
         server.dirty++;
     }
 }
@@ -472,12 +477,11 @@ void rpopCommand(redisClient *c) {
 }
 
 void lrangeCommand(redisClient *c) {
 }
 
 void lrangeCommand(redisClient *c) {
-    robj *o, *value;
+    robj *o;
     int start = atoi(c->argv[2]->ptr);
     int end = atoi(c->argv[3]->ptr);
     int llen;
     int start = atoi(c->argv[2]->ptr);
     int end = atoi(c->argv[3]->ptr);
     int llen;
-    int rangelen, j;
-    listTypeEntry entry;
+    int rangelen;
 
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
          || checkType(c,o,REDIS_LIST)) return;
 
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
          || checkType(c,o,REDIS_LIST)) return;
@@ -499,14 +503,31 @@ void lrangeCommand(redisClient *c) {
 
     /* Return the result in form of a multi-bulk reply */
     addReplyMultiBulkLen(c,rangelen);
 
     /* Return the result in form of a multi-bulk reply */
     addReplyMultiBulkLen(c,rangelen);
-    listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL);
-    for (j = 0; j < rangelen; j++) {
-        redisAssert(listTypeNext(li,&entry));
-        value = listTypeGet(&entry);
-        addReplyBulk(c,value);
-        decrRefCount(value);
+    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+        unsigned char *p = ziplistIndex(o->ptr,start);
+        unsigned char *vstr;
+        unsigned int vlen;
+        long long vlong;
+
+        while(rangelen--) {
+            ziplistGet(p,&vstr,&vlen,&vlong);
+            if (vstr) {
+                addReplyBulkCBuffer(c,vstr,vlen);
+            } else {
+                addReplyBulkLongLong(c,vlong);
+            }
+            p = ziplistNext(o->ptr,p);
+        }
+    } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
+        listNode *ln = listIndex(o->ptr,start);
+
+        while(rangelen--) {
+            addReplyBulk(c,ln->value);
+            ln = ln->next;
+        }
+    } else {
+        redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
     }
     }
-    listTypeReleaseIterator(li);
 }
 
 void ltrimCommand(redisClient *c) {
 }
 
 void ltrimCommand(redisClient *c) {
@@ -557,7 +578,7 @@ void ltrimCommand(redisClient *c) {
         redisPanic("Unknown list encoding");
     }
     if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
         redisPanic("Unknown list encoding");
     }
     if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
     addReply(c,shared.ok);
 }
     server.dirty++;
     addReply(c,shared.ok);
 }
@@ -600,7 +621,7 @@ void lremCommand(redisClient *c) {
 
     if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
     addReplyLongLong(c,removed);
 
     if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
     addReplyLongLong(c,removed);
-    if (removed) touchWatchedKey(c->db,c->argv[1]);
+    if (removed) signalModifiedKey(c->db,c->argv[1]);
 }
 
 /* This is the semantic of this command:
 }
 
 /* This is the semantic of this command:
@@ -626,7 +647,7 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
             dstobj = createZiplistObject();
             dbAdd(c->db,dstkey,dstobj);
         } else {
             dstobj = createZiplistObject();
             dbAdd(c->db,dstkey,dstobj);
         } else {
-            touchWatchedKey(c->db,dstkey);
+            signalModifiedKey(c->db,dstkey);
             server.dirty++;
         }
         listTypePush(dstobj,value,REDIS_HEAD);
             server.dirty++;
         }
         listTypePush(dstobj,value,REDIS_HEAD);
@@ -654,7 +675,7 @@ void rpoplpushCommand(redisClient *c) {
 
         /* Delete the source list when it is empty */
         if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
 
         /* Delete the source list when it is empty */
         if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
         server.dirty++;
     }
 }
@@ -757,14 +778,10 @@ void unblockClientWaitingData(redisClient *c) {
     zfree(c->bpop.keys);
     c->bpop.keys = NULL;
     c->bpop.target = NULL;
     zfree(c->bpop.keys);
     c->bpop.keys = NULL;
     c->bpop.target = NULL;
-    c->flags &= (~REDIS_BLOCKED);
+    c->flags &= ~REDIS_BLOCKED;
+    c->flags |= REDIS_UNBLOCKED;
     server.bpop_blocked_clients--;
     server.bpop_blocked_clients--;
-    /* We want to process data if there is some command waiting
-     * in the input buffer. Note that this is safe even if
-     * unblockClientWaitingData() gets called from freeClient() because
-     * freeClient() will be smart enough to call this function
-     * *after* c->querybuf was set to NULL. */
-    if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
+    listAddNodeTail(server.unblocked_clients,c);
 }
 
 /* This should be called from any function PUSHing into lists.
 }
 
 /* This should be called from any function PUSHing into lists.
@@ -805,7 +822,6 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
 
         /* This should remove the first element of the "clients" list. */
         unblockClientWaitingData(receiver);
 
         /* This should remove the first element of the "clients" list. */
         unblockClientWaitingData(receiver);
-        redisAssert(ln != listFirst(clients));
 
         if (dstkey == NULL) {
             /* BRPOP/BLPOP */
 
         if (dstkey == NULL) {
             /* BRPOP/BLPOP */
@@ -814,7 +830,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
             addReplyBulk(receiver,ele);
             return 1;
         } else {
             addReplyBulk(receiver,ele);
             return 1;
         } else {
-            /* BRPOPLPUSH */
+            /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
             dstobj = lookupKeyWrite(receiver->db,dstkey);
             if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
                 decrRefCount(dstkey);
             dstobj = lookupKeyWrite(receiver->db,dstkey);
             if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
                 decrRefCount(dstkey);
@@ -929,7 +945,7 @@ void brpoplpushCommand(redisClient *c) {
 
             /* Blocking against an empty list in a multi state
              * returns immediately. */
 
             /* Blocking against an empty list in a multi state
              * returns immediately. */
-            addReply(c, shared.nullmultibulk);
+            addReply(c, shared.nullbulk);
         } else {
             /* The list is empty and the client blocks. */
             blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
         } else {
             /* The list is empty and the client blocks. */
             blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);