From d866803818fb47a851e5730ccff634f993ce6f68 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 23 May 2012 22:12:50 +0200 Subject: [PATCH] BITOP command 10x speed improvement. This commit adds a fast-path to the BITOP that can be used for all the bytes from 0 to the minimal length of the string, and if there are at max 16 input keys. Often the intersected bitmaps are roughly the same size, so this optimization can provide a 10x speed boost to most real world usages of the command. Bytes are processed four full words at a time, in loops specialized for the specific BITOP sub-command, without the need to check for length issues with the inputs (since we run this algorithm only as far as there is data from all the keys at the same time). The remaining part of the string is intersected in the usual way using the slow but generic algorith. It is possible to do better than this with inputs that are not roughly the same size, sorting the input keys by length, by initializing the result string in a smarter way, and noticing that the final part of the output string composed of only data from the longest string does not need any proecessing since AND, OR and XOR against an empty string does not alter the output (zero in the first case, and the original string in the other two cases). More implementations will be implemented later likely, but this should be enough to release Redis 2.6-RC4 with bitops merged in. Note: this commit also adds better testing for BITOP NOT command, that is currently the faster and hard to optimize further since it just flips the bits of a single input string. --- src/bitops.c | 70 ++++++++++++++++++++++++++++++++++++++++++- tests/unit/bitops.tcl | 12 +++++++- 2 files changed, 80 insertions(+), 2 deletions(-) diff --git a/src/bitops.c b/src/bitops.c index 2f62ccc4..d309b7af 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -162,6 +162,7 @@ void bitopCommand(redisClient *c) { robj **objects; /* Array of soruce objects. */ unsigned char **src; /* Array of source strings pointers. */ long *len, maxlen = 0; /* Array of length of src strings, and max len. */ + long minlen = 0; /* Min len among the input keys. */ unsigned char *res = NULL; /* Resulting string. */ /* Parse the operation name. */ @@ -213,6 +214,7 @@ void bitopCommand(redisClient *c) { src[j] = objects[j]->ptr; len[j] = sdslen(objects[j]->ptr); if (len[j] > maxlen) maxlen = len[j]; + if (j == 0 || len[j] < minlen) minlen = len[j]; } /* Compute the bit operation, if at least one string is not empty. */ @@ -221,7 +223,73 @@ void bitopCommand(redisClient *c) { unsigned char output, byte; long i; - for (j = 0; j < maxlen; j++) { + /* Fast path: as far as we have data for all the input bitmaps we + * can take a fast path that performs much better than the + * vanilla algorithm. */ + j = 0; + if (minlen && numkeys <= 16) { + unsigned long *lp[16]; + unsigned long *lres = (unsigned long*) res; + + /* Note: sds pointer is always aligned to 8 byte boundary. */ + memcpy(lp,src,sizeof(unsigned long*)*numkeys); + memcpy(res,src[0],minlen); + + /* Different branches per different operations for speed (sorry). */ + if (op == BITOP_AND) { + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] &= lp[i][0]; + lres[1] &= lp[i][1]; + lres[2] &= lp[i][2]; + lres[3] &= lp[i][3]; + lp[i]+=4; + } + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_OR) { + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] |= lp[i][0]; + lres[1] |= lp[i][1]; + lres[2] |= lp[i][2]; + lres[3] |= lp[i][3]; + lp[i]+=4; + } + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_XOR) { + while(minlen >= sizeof(unsigned long)*4) { + for (i = 1; i < numkeys; i++) { + lres[0] ^= lp[i][0]; + lres[1] ^= lp[i][1]; + lres[2] ^= lp[i][2]; + lres[3] ^= lp[i][3]; + lp[i]+=4; + } + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } else if (op == BITOP_NOT) { + while(minlen >= sizeof(unsigned long)*4) { + lres[0] = ~lres[0]; + lres[1] = ~lres[1]; + lres[2] = ~lres[2]; + lres[3] = ~lres[3]; + lres+=4; + j += sizeof(unsigned long)*4; + minlen -= sizeof(unsigned long)*4; + } + } + } + + /* j is set to the next byte to process by the previous loop. */ + for (; j < maxlen; j++) { output = (len[0] <= j) ? 0 : src[0][j]; if (op == BITOP_NOT) output = ~output; for (i = 1; i < numkeys; i++) { diff --git a/tests/unit/bitops.tcl b/tests/unit/bitops.tcl index b7a76abb..c8f58ef1 100644 --- a/tests/unit/bitops.tcl +++ b/tests/unit/bitops.tcl @@ -24,13 +24,13 @@ proc simulate_bit_op {op args} { set out {} for {set x 0} {$x < $maxlen} {incr x} { set bit [string range $b(0) $x $x] + if {$op eq {not}} {set bit [expr {!$bit}]} for {set j 1} {$j < $count} {incr j} { set bit2 [string range $b($j) $x $x] switch $op { and {set bit [expr {$bit & $bit2}]} or {set bit [expr {$bit | $bit2}]} xor {set bit [expr {$bit ^ $bit2}]} - not {set bit [expr {!$bit}]} } } append out $bit @@ -135,6 +135,16 @@ start_server {tags {"bitops"}} { } } + test {BITOP NOT fuzzing} { + for {set i 0} {$i < 10} {incr i} { + r flushall + set str [randstring 0 1000] + r set str $str + r bitop not target str + assert_equal [r get target] [simulate_bit_op not $str] + } + } + test {BITOP with integer encoded source objects} { r set a 1 r set b 2 -- 2.45.2