From: antirez Date: Wed, 16 May 2012 14:23:09 +0000 (+0200) Subject: New commands: BITOP and BITCOUNT. X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/4a789decaa757e7ceb55cb49248070ba7ae15963?hp=bec74b1b9cd57358f24bd7231f879120a5bf61ab New commands: BITOP and BITCOUNT. The motivation for this new commands is to be search in the usage of Redis for real time statistics. See the article "Fast real time metrics using Redis". http://blog.getspool.com/2011/11/29/fast-easy-realtime-metrics-using-redis-bitmaps/ In general Redis strings when used as bitmaps using the SETBIT/GETBIT command provide a very space-efficient and fast way to store statistics. For instance in a web application with users, every user can be associated with a key that shows every day in which the user visited the web service. This information can be really valuable to extract user behaviour information. With Redis bitmaps doing this is very simple just saying that a given day is 0 (the data the service was put online) and all the next days are 1, 2, 3, and so forth. So with SETBIT it is possible to set the bit corresponding to the current day every time the user visits the site. It is possible to take the count of the bit sets on the run, this is extremely easy using a Lua script. However a fast bit count native operation can be useful, especially if it can operate on ranges, or when the string is small like in the case of days (even if you consider many years it is still extremely little data). For this reason BITOP was introduced. The command counts the number of bits set to 1 in a string, with optional range: BITCOUNT key [start end] The start/end parameters are similar to GETRANGE. If omitted the whole string is tested. Population counting is more useful when bit-level operations like AND, OR and XOR are avaialble. For instance I can test multiple users to see the number of days three users visited the site at the same time. To do this we can take the AND of all the bitmaps, and then count the set bits. For this reason the BITOP command was introduced: BITOP [AND|OR|XOR|NOT] dest_key src_key1 src_key2 src_key3 ... src_keyN In the special case of NOT (that inverts the bits) only one source key can be passed. The judicious use of BITCOUNT and BITOP combined can lead to interesting use cases with very space efficient representation of data. The implementation provided is still not tested and optimized for speed, next commits will introduce unit tests. Later the implementation will be profiled to see if it is possible to gain an important amount of speed without making the code much more complex. --- diff --git a/src/redis.c b/src/redis.c index c9a2c5ba..9937cf56 100644 --- a/src/redis.c +++ b/src/redis.c @@ -242,7 +242,9 @@ struct redisCommand redisCommandTable[] = { {"evalsha",evalShaCommand,-3,"s",0,zunionInterGetKeys,0,0,0,0,0}, {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0}, {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0}, - {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0} + {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}, + {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0}, + {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0} }; /*============================ Utility functions ============================ */ diff --git a/src/redis.h b/src/redis.h index 5d11799a..cf971667 100644 --- a/src/redis.h +++ b/src/redis.h @@ -1103,6 +1103,8 @@ void evalCommand(redisClient *c); void evalShaCommand(redisClient *c); void scriptCommand(redisClient *c); void timeCommand(redisClient *c); +void bitopCommand(redisClient *c); +void bitcountCommand(redisClient *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --git a/src/t_string.c b/src/t_string.c index d6143ed2..46637d70 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -463,3 +463,152 @@ void strlenCommand(redisClient *c) { addReplyLongLong(c,stringObjectLen(o)); } +#define BITOP_AND 0 +#define BITOP_OR 1 +#define BITOP_XOR 2 +#define BITOP_NOT 3 +/* BITOP op_name target_key src_key1 src_key2 src_key3 ... src_keyN */ +void bitopCommand(redisClient *c) { + char *opname = c->argv[1]->ptr; + robj *o, *targetkey = c->argv[2]; + long op, j, numkeys; + unsigned char **src; /* Array of source strings pointers. */ + long *len, maxlen = 0; /* Array of length of src strings, and max len. */ + unsigned char *res = NULL; /* Resulting string. */ + + /* Parse the operation name. */ + if ((opname[0] == 'a' || opname[0] == 'A') && !strcasecmp(opname,"and")) + op = BITOP_AND; + else if((opname[0] == 'o' || opname[0] == 'O') && !strcasecmp(opname,"or")) + op = BITOP_OR; + else if((opname[0] == 'x' || opname[0] == 'X') && !strcasecmp(opname,"xor")) + op = BITOP_XOR; + else if((opname[0] == 'n' || opname[0] == 'N') && !strcasecmp(opname,"not")) + op = BITOP_NOT; + else { + addReply(c,shared.syntaxerr); + return; + } + + /* Sanity check: NOT accepts only a single key argument. */ + if (op == BITOP_NOT && c->argc != 4) { + addReplyError(c,"BITOP NOT must be called with a single source key."); + return; + } + + /* Lookup keys, and store pointers to the string objects into an array. */ + numkeys = c->argc - 3; + src = zmalloc(sizeof(unsigned char*) * numkeys); + len = zmalloc(sizeof(long) * numkeys); + for (j = 0; j < numkeys; j++) { + o = lookupKeyRead(c->db,c->argv[j+3]); + /* Handle non-existing keys as empty strings. */ + if (o == NULL) { + src[j] = NULL; + len[j] = 0; + continue; + } + /* Return an error if one of the keys is not a string. */ + if (checkType(c,o,REDIS_STRING)) { + zfree(src); + zfree(len); + return; + } + src[j] = o->ptr; + len[j] = sdslen(o->ptr); + if (len[j] > maxlen) maxlen = len[j]; + } + + /* Compute the bit operation, if at least one string is not empty. */ + if (maxlen) { + res = (unsigned char*) sdsnewlen(NULL,maxlen); + unsigned char output, byte; + long i; + + for (j = 0; j < maxlen; j++) { + output = (len[0] <= j) ? 0 : src[0][j]; + if (op == BITOP_NOT) output = ~output; + for (i = 1; i < numkeys; i++) { + byte = (len[i] <= j) ? 0 : src[i][j]; + switch(op) { + case BITOP_AND: output &= byte; break; + case BITOP_OR: output |= byte; break; + case BITOP_XOR: output ^= byte; break; + } + } + res[j] = output; + } + } + zfree(src); + zfree(len); + + /* Store the computed value into the target key */ + if (maxlen) { + o = createObject(REDIS_STRING,res); + setKey(c->db,targetkey,o); + decrRefCount(o); + } else if (dbDelete(c->db,targetkey)) { + signalModifiedKey(c->db,targetkey); + } + server.dirty++; + addReplyLongLong(c,maxlen); /* Return the output string length in bytes. */ +} + +/* BITCOUNT key [start end] */ +void bitcountCommand(redisClient *c) { + static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8}; + robj *o; + long start, end; + unsigned char *p; + char llbuf[32]; + size_t strlen; + + /* Lookup, check for type, and return 0 for non existing keys. */ + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + checkType(c,o,REDIS_STRING)) return; + + /* Set the 'p' pointer to the string, that can be just a stack allocated + * array if our string was integer encoded. */ + if (o->encoding == REDIS_ENCODING_INT) { + p = (unsigned char*) llbuf; + strlen = ll2string(llbuf,sizeof(llbuf),(long)o->ptr); + } else { + p = (unsigned char*) o->ptr; + strlen = sdslen(o->ptr); + } + + /* Parse start/end range if any. */ + if (c->argc == 4) { + if (getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != REDIS_OK) + return; + if (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != REDIS_OK) + return; + /* Convert negative indexes */ + if (start < 0) start = strlen+start; + if (end < 0) end = strlen+end; + if (start < 0) start = 0; + if (end < 0) end = 0; + if ((unsigned)end >= strlen) end = strlen-1; + } else if (c->argc == 2) { + /* The whole string. */ + start = 0; + end = strlen-1; + } else { + /* Syntax error. */ + addReply(c,shared.syntaxerr); + return; + } + + /* Precondition: end >= 0 && end < strlen, so the only condition where + * zero can be returned is: start > end. */ + if (start > end) { + addReply(c,shared.czero); + } else { + long bits = 0, bytes = end-start+1; + + /* We can finally count bits. */ + p += start; + while(bytes--) bits += bitsinbyte[*p++]; + addReplyLongLong(c,bits); + } +}