]> git.saurik.com Git - redis.git/blobdiff - src/t_list.c
MIGRATE now let the client distinguish I/O errors and timeouts from other erros.
[redis.git] / src / t_list.c
index c5e3df61490651f0fce92477e93b05e7674c6f1f..6a16a63201921028c4b13014d8039238d1be599c 100644 (file)
@@ -86,7 +86,7 @@ unsigned long listTypeLength(robj *subject) {
 }
 
 /* Initialize an iterator at the specified index. */
 }
 
 /* Initialize an iterator at the specified index. */
-listTypeIterator *listTypeInitIterator(robj *subject, int index, unsigned char direction) {
+listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) {
     listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
     li->subject = subject;
     li->encoding = subject->encoding;
     listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
     li->subject = subject;
     li->encoding = subject->encoding;
@@ -259,7 +259,7 @@ void listTypeConvert(robj *subject, int enc) {
  *----------------------------------------------------------------------------*/
 
 void pushGenericCommand(redisClient *c, int where) {
  *----------------------------------------------------------------------------*/
 
 void pushGenericCommand(redisClient *c, int where) {
-    int j, addlen = 0, pushed = 0;
+    int j, waiting = 0, pushed = 0;
     robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
     int may_have_waiting_clients = (lobj == NULL);
 
     robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
     int may_have_waiting_clients = (lobj == NULL);
 
@@ -272,7 +272,7 @@ void pushGenericCommand(redisClient *c, int where) {
         c->argv[j] = tryObjectEncoding(c->argv[j]);
         if (may_have_waiting_clients) {
             if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
         c->argv[j] = tryObjectEncoding(c->argv[j]);
         if (may_have_waiting_clients) {
             if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
-                addlen++;
+                waiting++;
                 continue;
             } else {
                 may_have_waiting_clients = 0;
                 continue;
             } else {
                 may_have_waiting_clients = 0;
@@ -285,9 +285,21 @@ void pushGenericCommand(redisClient *c, int where) {
         listTypePush(lobj,c->argv[j],where);
         pushed++;
     }
         listTypePush(lobj,c->argv[j],where);
         pushed++;
     }
-    addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0));
+    addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
     if (pushed) signalModifiedKey(c->db,c->argv[1]);
     server.dirty += pushed;
     if (pushed) signalModifiedKey(c->db,c->argv[1]);
     server.dirty += pushed;
+
+    /* Alter the replication of the command accordingly to the number of
+     * list elements delivered to clients waiting into a blocking operation.
+     * We do that only if there were waiting clients, and only if still some
+     * element was pushed into the list (othewise dirty is 0 and nothign will
+     * be propagated). */
+    if (waiting && pushed) {
+        /* CMD KEY a b C D E */
+        for (j = 0; j < waiting; j++) decrRefCount(c->argv[j+2]);
+        memmove(c->argv+2,c->argv+2+waiting,sizeof(robj*)*pushed);
+        c->argc -= waiting;
+    }
 }
 
 void lpushCommand(redisClient *c) {
 }
 
 void lpushCommand(redisClient *c) {
@@ -381,9 +393,12 @@ void llenCommand(redisClient *c) {
 void lindexCommand(redisClient *c) {
     robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
     if (o == NULL || checkType(c,o,REDIS_LIST)) return;
 void lindexCommand(redisClient *c) {
     robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
     if (o == NULL || checkType(c,o,REDIS_LIST)) return;
-    int index = atoi(c->argv[2]->ptr);
+    long index;
     robj *value = NULL;
 
     robj *value = NULL;
 
+    if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
+        return;
+
     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
         unsigned char *p;
         unsigned char *vstr;
     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
         unsigned char *p;
         unsigned char *vstr;
@@ -417,9 +432,12 @@ void lindexCommand(redisClient *c) {
 void lsetCommand(redisClient *c) {
     robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
     if (o == NULL || checkType(c,o,REDIS_LIST)) return;
 void lsetCommand(redisClient *c) {
     robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
     if (o == NULL || checkType(c,o,REDIS_LIST)) return;
-    int index = atoi(c->argv[2]->ptr);
+    long index;
     robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
 
     robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
 
+    if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
+        return;
+
     listTypeTryConversion(o,value);
     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
         unsigned char *p, *zl = o->ptr;
     listTypeTryConversion(o,value);
     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
         unsigned char *p, *zl = o->ptr;
@@ -478,10 +496,10 @@ void rpopCommand(redisClient *c) {
 
 void lrangeCommand(redisClient *c) {
     robj *o;
 
 void lrangeCommand(redisClient *c) {
     robj *o;
-    int start = atoi(c->argv[2]->ptr);
-    int end = atoi(c->argv[3]->ptr);
-    int llen;
-    int rangelen;
+    long start, end, llen, rangelen;
+
+    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
+        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
 
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
          || checkType(c,o,REDIS_LIST)) return;
 
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
          || checkType(c,o,REDIS_LIST)) return;
@@ -537,13 +555,13 @@ void lrangeCommand(redisClient *c) {
 
 void ltrimCommand(redisClient *c) {
     robj *o;
 
 void ltrimCommand(redisClient *c) {
     robj *o;
-    int start = atoi(c->argv[2]->ptr);
-    int end = atoi(c->argv[3]->ptr);
-    int llen;
-    int j, ltrim, rtrim;
+    long start, end, llen, j, ltrim, rtrim;
     list *list;
     listNode *ln;
 
     list *list;
     listNode *ln;
 
+    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
+        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
+
     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
         checkType(c,o,REDIS_LIST)) return;
     llen = listTypeLength(o);
     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
         checkType(c,o,REDIS_LIST)) return;
     llen = listTypeLength(o);
@@ -591,10 +609,13 @@ void ltrimCommand(redisClient *c) {
 void lremCommand(redisClient *c) {
     robj *subject, *obj;
     obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
 void lremCommand(redisClient *c) {
     robj *subject, *obj;
     obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
-    int toremove = atoi(c->argv[2]->ptr);
-    int removed = 0;
+    long toremove;
+    long removed = 0;
     listTypeEntry entry;
 
     listTypeEntry entry;
 
+    if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
+        return;
+
     subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
     if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
 
     subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
     if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
 
@@ -646,8 +667,6 @@ void lremCommand(redisClient *c) {
  */
 
 void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
  */
 
 void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
-    robj *aux;
-
     if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
         /* Create the list if the key does not exist */
         if (!dstobj) {
     if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
         /* Create the list if the key does not exist */
         if (!dstobj) {
@@ -657,27 +676,19 @@ void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey,
             signalModifiedKey(c->db,dstkey);
         }
         listTypePush(dstobj,value,REDIS_HEAD);
             signalModifiedKey(c->db,dstkey);
         }
         listTypePush(dstobj,value,REDIS_HEAD);
-        /* If we are pushing as a result of LPUSH against a key
-         * watched by BRPOPLPUSH, we need to rewrite the command vector
-         * as an LPUSH.
-         *
-         * If this is called directly by RPOPLPUSH (either directly
-         * or via a BRPOPLPUSH where the popped list exists)
-         * we should replicate the RPOPLPUSH command itself. */
-        if (c != origclient) {
-            aux = createStringObject("LPUSH",5);
-            rewriteClientCommandVector(origclient,3,aux,dstkey,value);
-            decrRefCount(aux);
-        } else {
-            /* Make sure to always use RPOPLPUSH in the replication / AOF,
-             * even if the original command was BRPOPLPUSH. */
-            aux = createStringObject("RPOPLPUSH",9);
-            rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
-            decrRefCount(aux);
+        /* Additionally propagate this PUSH operation together with
+         * the operation performed by the command. */
+        {
+            robj **argv = zmalloc(sizeof(robj*)*3);
+            argv[0] = createStringObject("LPUSH",5);
+            argv[1] = dstkey;
+            argv[2] = value;
+            incrRefCount(argv[1]);
+            incrRefCount(argv[2]);
+            alsoPropagate(server.lpushCommand,c->db->id,argv,3,
+                          REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
         }
         }
-        server.dirty++;
     }
     }
-
     /* Always send the pushed value to the client. */
     addReplyBulk(c,value);
 }
     /* Always send the pushed value to the client. */
     addReplyBulk(c,value);
 }
@@ -708,6 +719,13 @@ void rpoplpushCommand(redisClient *c) {
         signalModifiedKey(c->db,touchedkey);
         decrRefCount(touchedkey);
         server.dirty++;
         signalModifiedKey(c->db,touchedkey);
         decrRefCount(touchedkey);
         server.dirty++;
+
+        /* Replicate this as a simple RPOP since the LPUSH side is replicated
+         * by rpoplpushHandlePush() call if needed (it may not be needed
+         * if a client is blocking wait a push against the list). */
+        rewriteClientCommandVector(c,2,
+            resetRefCount(createStringObject("RPOP",4)),
+            c->argv[1]);
     }
 }
 
     }
 }
 
@@ -776,7 +794,7 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj
             incrRefCount(keys[j]);
             redisAssertWithInfo(c,keys[j],retval == DICT_OK);
         } else {
             incrRefCount(keys[j]);
             redisAssertWithInfo(c,keys[j],retval == DICT_OK);
         } else {
-            l = dictGetEntryVal(de);
+            l = dictGetVal(de);
         }
         listAddNodeTail(l,c);
     }
         }
         listAddNodeTail(l,c);
     }
@@ -797,7 +815,7 @@ void unblockClientWaitingData(redisClient *c) {
         /* Remove this client from the list of clients waiting for this key. */
         de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
         redisAssertWithInfo(c,c->bpop.keys[j],de != NULL);
         /* Remove this client from the list of clients waiting for this key. */
         de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
         redisAssertWithInfo(c,c->bpop.keys[j],de != NULL);
-        l = dictGetEntryVal(de);
+        l = dictGetVal(de);
         listDelNode(l,listSearchKey(l,c));
         /* If the list is empty we need to remove it to avoid wasting memory */
         if (listLength(l) == 0)
         listDelNode(l,listSearchKey(l,c));
         /* If the list is empty we need to remove it to avoid wasting memory */
         if (listLength(l) == 0)
@@ -836,7 +854,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
 
     de = dictFind(c->db->blocking_keys,key);
     if (de == NULL) return 0;
 
     de = dictFind(c->db->blocking_keys,key);
     if (de == NULL) return 0;
-    clients = dictGetEntryVal(de);
+    clients = dictGetVal(de);
     numclients = listLength(clients);
 
     /* Try to handle the push as long as there are clients waiting for a push.
     numclients = listLength(clients);
 
     /* Try to handle the push as long as there are clients waiting for a push.
@@ -892,7 +910,7 @@ int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
         return REDIS_ERR;
     }
 
         return REDIS_ERR;
     }
 
-    if (tval > 0) tval += time(NULL);
+    if (tval > 0) tval += server.unixtime;
     *timeout = tval;
 
     return REDIS_OK;
     *timeout = tval;
 
     return REDIS_OK;
@@ -915,36 +933,22 @@ void blockingPopGenericCommand(redisClient *c, int where) {
                 return;
             } else {
                 if (listTypeLength(o) != 0) {
                 return;
             } else {
                 if (listTypeLength(o) != 0) {
-                    /* If the list contains elements fall back to the usual
-                     * non-blocking POP operation */
-                    struct redisCommand *orig_cmd;
-                    robj *argv[2], **orig_argv;
-                    int orig_argc;
-
-                    /* We need to alter the command arguments before to call
-                     * popGenericCommand() as the command takes a single key. */
-                    orig_argv = c->argv;
-                    orig_argc = c->argc;
-                    orig_cmd = c->cmd;
-                    argv[1] = c->argv[j];
-                    c->argv = argv;
-                    c->argc = 2;
-
-                    /* Also the return value is different, we need to output
-                     * the multi bulk reply header and the key name. The
-                     * "real" command will add the last element (the value)
-                     * for us. If this souds like an hack to you it's just
-                     * because it is... */
-                    addReplyMultiBulkLen(c,2);
-                    addReplyBulk(c,argv[1]);
-
-                    popGenericCommand(c,where);
-
-                    /* Fix the client structure with the original stuff */
-                    c->argv = orig_argv;
-                    c->argc = orig_argc;
-                    c->cmd = orig_cmd;
+                    /* Non empty list, this is like a non normal [LR]POP. */
+                    robj *value = listTypePop(o,where);
+                    redisAssert(value != NULL);
 
 
+                    addReplyMultiBulkLen(c,2);
+                    addReplyBulk(c,c->argv[j]);
+                    addReplyBulk(c,value);
+                    decrRefCount(value);
+                    if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[j]);
+                    signalModifiedKey(c->db,c->argv[j]);
+                    server.dirty++;
+
+                    /* Replicate it as an [LR]POP instead of B[LR]POP. */
+                    rewriteClientCommandVector(c,2,
+                        (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
+                        c->argv[j]);
                     return;
                 }
             }
                     return;
                 }
             }