]> git.saurik.com Git - redis.git/commitdiff
BITOP command 10x speed improvement.
authorantirez <antirez@gmail.com>
Wed, 23 May 2012 20:12:50 +0000 (22:12 +0200)
committerantirez <antirez@gmail.com>
Thu, 24 May 2012 13:25:13 +0000 (15:25 +0200)
This commit adds a fast-path to the BITOP that can be used for all the
bytes from 0 to the minimal length of the string, and if there are
at max 16 input keys.

Often the intersected bitmaps are roughly the same size, so this
optimization can provide a 10x speed boost to most real world usages
of the command.

Bytes are processed four full words at a time, in loops specialized
for the specific BITOP sub-command, without the need to check for
length issues with the inputs (since we run this algorithm only as far
as there is data from all the keys at the same time).

The remaining part of the string is intersected in the usual way using
the slow but generic algorith.

It is possible to do better than this with inputs that are not roughly
the same size, sorting the input keys by length, by initializing the
result string in a smarter way, and noticing that the final part of the
output string composed of only data from the longest string does not
need any proecessing since AND, OR and XOR against an empty string does
not alter the output (zero in the first case, and the original string in
the other two cases).

More implementations will be implemented later likely, but this should
be enough to release Redis 2.6-RC4 with bitops merged in.

Note: this commit also adds better testing for BITOP NOT command, that
is currently the faster and hard to optimize further since it just
flips the bits of a single input string.

src/bitops.c
tests/unit/bitops.tcl

index 2f62ccc460494bd0f12b6482cc3fceea545f950c..d309b7afa15cc05b7b4b33df9e1a18289ddb702e 100644 (file)
@@ -162,6 +162,7 @@ void bitopCommand(redisClient *c) {
     robj **objects;      /* Array of soruce objects. */
     unsigned char **src; /* Array of source strings pointers. */
     long *len, maxlen = 0; /* Array of length of src strings, and max len. */
+    long minlen = 0;    /* Min len among the input keys. */
     unsigned char *res = NULL; /* Resulting string. */
 
     /* Parse the operation name. */
@@ -213,6 +214,7 @@ void bitopCommand(redisClient *c) {
         src[j] = objects[j]->ptr;
         len[j] = sdslen(objects[j]->ptr);
         if (len[j] > maxlen) maxlen = len[j];
+        if (j == 0 || len[j] < minlen) minlen = len[j];
     }
 
     /* Compute the bit operation, if at least one string is not empty. */
@@ -221,7 +223,73 @@ void bitopCommand(redisClient *c) {
         unsigned char output, byte;
         long i;
 
-        for (j = 0; j < maxlen; j++) {
+        /* Fast path: as far as we have data for all the input bitmaps we
+         * can take a fast path that performs much better than the
+         * vanilla algorithm. */
+        j = 0;
+        if (minlen && numkeys <= 16) {
+            unsigned long *lp[16];
+            unsigned long *lres = (unsigned long*) res;
+
+            /* Note: sds pointer is always aligned to 8 byte boundary. */
+            memcpy(lp,src,sizeof(unsigned long*)*numkeys);
+            memcpy(res,src[0],minlen);
+
+            /* Different branches per different operations for speed (sorry). */
+            if (op == BITOP_AND) {
+                while(minlen >= sizeof(unsigned long)*4) {
+                    for (i = 1; i < numkeys; i++) {
+                        lres[0] &= lp[i][0];
+                        lres[1] &= lp[i][1];
+                        lres[2] &= lp[i][2];
+                        lres[3] &= lp[i][3];
+                        lp[i]+=4;
+                    }
+                    lres+=4;
+                    j += sizeof(unsigned long)*4;
+                    minlen -= sizeof(unsigned long)*4;
+                }
+            } else if (op == BITOP_OR) {
+                while(minlen >= sizeof(unsigned long)*4) {
+                    for (i = 1; i < numkeys; i++) {
+                        lres[0] |= lp[i][0];
+                        lres[1] |= lp[i][1];
+                        lres[2] |= lp[i][2];
+                        lres[3] |= lp[i][3];
+                        lp[i]+=4;
+                    }
+                    lres+=4;
+                    j += sizeof(unsigned long)*4;
+                    minlen -= sizeof(unsigned long)*4;
+                }
+            } else if (op == BITOP_XOR) {
+                while(minlen >= sizeof(unsigned long)*4) {
+                    for (i = 1; i < numkeys; i++) {
+                        lres[0] ^= lp[i][0];
+                        lres[1] ^= lp[i][1];
+                        lres[2] ^= lp[i][2];
+                        lres[3] ^= lp[i][3];
+                        lp[i]+=4;
+                    }
+                    lres+=4;
+                    j += sizeof(unsigned long)*4;
+                    minlen -= sizeof(unsigned long)*4;
+                }
+            } else if (op == BITOP_NOT) {
+                while(minlen >= sizeof(unsigned long)*4) {
+                    lres[0] = ~lres[0];
+                    lres[1] = ~lres[1];
+                    lres[2] = ~lres[2];
+                    lres[3] = ~lres[3];
+                    lres+=4;
+                    j += sizeof(unsigned long)*4;
+                    minlen -= sizeof(unsigned long)*4;
+                }
+            }
+        }
+
+        /* j is set to the next byte to process by the previous loop. */
+        for (; j < maxlen; j++) {
             output = (len[0] <= j) ? 0 : src[0][j];
             if (op == BITOP_NOT) output = ~output;
             for (i = 1; i < numkeys; i++) {
index b7a76abb12921fcca06376700b9b80b369cce55e..c8f58ef1e5931b5a0b6fb34a88e881cf449ea6e7 100644 (file)
@@ -24,13 +24,13 @@ proc simulate_bit_op {op args} {
     set out {}
     for {set x 0} {$x < $maxlen} {incr x} {
         set bit [string range $b(0) $x $x]
+        if {$op eq {not}} {set bit [expr {!$bit}]}
         for {set j 1} {$j < $count} {incr j} {
             set bit2 [string range $b($j) $x $x]
             switch $op {
                 and {set bit [expr {$bit & $bit2}]}
                 or  {set bit [expr {$bit | $bit2}]}
                 xor {set bit [expr {$bit ^ $bit2}]}
-                not {set bit [expr {!$bit}]}
             }
         }
         append out $bit
@@ -135,6 +135,16 @@ start_server {tags {"bitops"}} {
         }
     }
 
+    test {BITOP NOT fuzzing} {
+        for {set i 0} {$i < 10} {incr i} {
+            r flushall
+            set str [randstring 0 1000]
+            r set str $str
+            r bitop not target str
+            assert_equal [r get target] [simulate_bit_op not $str]
+        }
+    }
+
     test {BITOP with integer encoded source objects} {
         r set a 1
         r set b 2