]>
git.saurik.com Git - redis.git/blob - src/sort.c
1 /* SORT command and helper functions.
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
33 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
34 #include <math.h> /* isnan() */
36 zskiplistNode
* zslGetElementByRank(zskiplist
*zsl
, unsigned long rank
);
38 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
39 redisSortOperation
*so
= zmalloc(sizeof(*so
));
41 so
->pattern
= pattern
;
45 /* Return the value associated to the key with a name obtained using
46 * the following rules:
48 * 1) The first occurence of '*' in 'pattern' is substituted with 'subst'.
50 * 2) If 'pattern' matches the "->" string, everything on the left of
51 * the arrow is treated as the name of an hash field, and the part on the
52 * left as the key name containing an hash. The value of the specified
55 * 3) If 'pattern' equals "#", the function simply returns 'subst' itself so
56 * that the SORT command can be used like: SORT key GET # to retrieve
57 * the Set/List elements directly.
59 * The returned object will always have its refcount increased by 1
60 * when it is non-NULL. */
61 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
64 robj
*keyobj
, *fieldobj
= NULL
, *o
;
65 int prefixlen
, sublen
, postfixlen
, fieldlen
;
67 /* If the pattern is "#" return the substitution object itself in order
68 * to implement the "SORT ... GET #" feature. */
70 if (spat
[0] == '#' && spat
[1] == '\0') {
75 /* The substitution object may be specially encoded. If so we create
76 * a decoded object on the fly. Otherwise getDecodedObject will just
77 * increment the ref count, that we'll decrement later. */
78 subst
= getDecodedObject(subst
);
81 /* If we can't find '*' in the pattern we return NULL as to GET a
82 * fixed key does not make sense. */
89 /* Find out if we're dealing with a hash dereference. */
90 if ((f
= strstr(p
+1, "->")) != NULL
&& *(f
+2) != '\0') {
91 fieldlen
= sdslen(spat
)-(f
-spat
)-2;
92 fieldobj
= createStringObject(f
+2,fieldlen
);
97 /* Perform the '*' substitution. */
99 sublen
= sdslen(ssub
);
100 postfixlen
= sdslen(spat
)-(prefixlen
+1)-(fieldlen
? fieldlen
+2 : 0);
101 keyobj
= createStringObject(NULL
,prefixlen
+sublen
+postfixlen
);
103 memcpy(k
,spat
,prefixlen
);
104 memcpy(k
+prefixlen
,ssub
,sublen
);
105 memcpy(k
+prefixlen
+sublen
,p
+1,postfixlen
);
106 decrRefCount(subst
); /* Incremented by decodeObject() */
108 /* Lookup substituted key */
109 o
= lookupKeyRead(db
,keyobj
);
110 if (o
== NULL
) goto noobj
;
113 if (o
->type
!= REDIS_HASH
) goto noobj
;
115 /* Retrieve value from hash by the field name. This operation
116 * already increases the refcount of the returned object. */
117 o
= hashTypeGetObject(o
, fieldobj
);
119 if (o
->type
!= REDIS_STRING
) goto noobj
;
121 /* Every object that this function returns needs to have its refcount
122 * increased. sortCommand decreases it again. */
125 decrRefCount(keyobj
);
126 if (fieldobj
) decrRefCount(fieldobj
);
130 decrRefCount(keyobj
);
131 if (fieldlen
) decrRefCount(fieldobj
);
135 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
136 * the additional parameter is not standard but a BSD-specific we have to
137 * pass sorting parameters via the global 'server' structure */
138 int sortCompare(const void *s1
, const void *s2
) {
139 const redisSortObject
*so1
= s1
, *so2
= s2
;
142 if (!server
.sort_alpha
) {
143 /* Numeric sorting. Here it's trivial as we precomputed scores */
144 if (so1
->u
.score
> so2
->u
.score
) {
146 } else if (so1
->u
.score
< so2
->u
.score
) {
149 /* Objects have the same score, but we don't want the comparison
150 * to be undefined, so we compare objects lexicographycally.
151 * This way the result of SORT is deterministic. */
152 cmp
= compareStringObjects(so1
->obj
,so2
->obj
);
155 /* Alphanumeric sorting */
156 if (server
.sort_bypattern
) {
157 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
158 /* At least one compare object is NULL */
159 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
161 else if (so1
->u
.cmpobj
== NULL
)
166 /* We have both the objects, use strcoll */
167 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
170 /* Compare elements directly. */
171 cmp
= compareStringObjects(so1
->obj
,so2
->obj
);
174 return server
.sort_desc
? -cmp
: cmp
;
177 /* The SORT command is the most complex command in Redis. Warning: this code
178 * is optimized for speed and a bit less for readability */
179 void sortCommand(redisClient
*c
) {
181 unsigned int outputlen
= 0;
182 int desc
= 0, alpha
= 0;
183 long limit_start
= 0, limit_count
= -1, start
, end
;
184 int j
, dontsort
= 0, vectorlen
;
185 int getop
= 0; /* GET operation counter */
186 int int_convertion_error
= 0;
187 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
188 redisSortObject
*vector
; /* Resulting vector to sort */
190 /* Lookup the key to sort. It must be of the right types */
191 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
192 if (sortval
&& sortval
->type
!= REDIS_SET
&&
193 sortval
->type
!= REDIS_LIST
&&
194 sortval
->type
!= REDIS_ZSET
)
196 addReply(c
,shared
.wrongtypeerr
);
200 /* Create a list of operations to perform for every sorted element.
201 * Operations can be GET/DEL/INCR/DECR */
202 operations
= listCreate();
203 listSetFreeMethod(operations
,zfree
);
204 j
= 2; /* options start at argv[2] */
206 /* Now we need to protect sortval incrementing its count, in the future
207 * SORT may have options able to overwrite/delete keys during the sorting
208 * and the sorted key itself may get destroied */
210 incrRefCount(sortval
);
212 sortval
= createListObject();
214 /* The SORT command has an SQL-alike syntax, parse it */
216 int leftargs
= c
->argc
-j
-1;
217 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
219 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
221 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
223 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
224 if ((getLongFromObjectOrReply(c
, c
->argv
[j
+1], &limit_start
, NULL
) != REDIS_OK
) ||
225 (getLongFromObjectOrReply(c
, c
->argv
[j
+2], &limit_count
, NULL
) != REDIS_OK
)) return;
227 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
228 storekey
= c
->argv
[j
+1];
230 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
231 sortby
= c
->argv
[j
+1];
232 /* If the BY pattern does not contain '*', i.e. it is constant,
233 * we don't need to sort nor to lookup the weight keys. */
234 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
236 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
237 listAddNodeTail(operations
,createSortOperation(
238 REDIS_SORT_GET
,c
->argv
[j
+1]));
242 decrRefCount(sortval
);
243 listRelease(operations
);
244 addReply(c
,shared
.syntaxerr
);
250 /* For the STORE option, or when SORT is called from a Lua script,
251 * we want to force a specific ordering even when no explicit ordering
252 * was asked (SORT BY nosort). This guarantees that replication / AOF
255 * However in the case 'dontsort' is true, but the type to sort is a
256 * sorted set, we don't need to do anything as ordering is guaranteed
257 * in this special case. */
258 if ((storekey
|| c
->flags
& REDIS_LUA_CLIENT
) &&
259 (dontsort
&& sortval
->type
!= REDIS_ZSET
))
261 /* Force ALPHA sorting */
267 /* Destructively convert encoded sorted sets for SORT. */
268 if (sortval
->type
== REDIS_ZSET
)
269 zsetConvert(sortval
, REDIS_ENCODING_SKIPLIST
);
271 /* Objtain the length of the object to sort. */
272 switch(sortval
->type
) {
273 case REDIS_LIST
: vectorlen
= listTypeLength(sortval
); break;
274 case REDIS_SET
: vectorlen
= setTypeSize(sortval
); break;
275 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
276 default: vectorlen
= 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */
279 /* Perform LIMIT start,count sanity checking. */
280 start
= (limit_start
< 0) ? 0 : limit_start
;
281 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
282 if (start
>= vectorlen
) {
286 if (end
>= vectorlen
) end
= vectorlen
-1;
290 * 1) if the object to sort is a sorted set.
291 * 2) There is nothing to sort as dontsort is true (BY <constant string>).
292 * 3) We have a LIMIT option that actually reduces the number of elements
295 * In this case to load all the objects in the vector is a huge waste of
296 * resources. We just allocate a vector that is big enough for the selected
297 * range length, and make sure to load just this part in the vector. */
298 if (sortval
->type
== REDIS_ZSET
&&
300 (start
!= 0 || end
!= vectorlen
-1))
302 vectorlen
= end
-start
+1;
305 /* Load the sorting vector with all the objects to sort */
306 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
309 if (sortval
->type
== REDIS_LIST
) {
310 listTypeIterator
*li
= listTypeInitIterator(sortval
,0,REDIS_TAIL
);
312 while(listTypeNext(li
,&entry
)) {
313 vector
[j
].obj
= listTypeGet(&entry
);
314 vector
[j
].u
.score
= 0;
315 vector
[j
].u
.cmpobj
= NULL
;
318 listTypeReleaseIterator(li
);
319 } else if (sortval
->type
== REDIS_SET
) {
320 setTypeIterator
*si
= setTypeInitIterator(sortval
);
322 while((ele
= setTypeNextObject(si
)) != NULL
) {
324 vector
[j
].u
.score
= 0;
325 vector
[j
].u
.cmpobj
= NULL
;
328 setTypeReleaseIterator(si
);
329 } else if (sortval
->type
== REDIS_ZSET
&& dontsort
) {
330 /* Special handling for a sorted set, if 'dontsort' is true.
331 * This makes sure we return elements in the sorted set original
332 * ordering, accordingly to DESC / ASC options.
334 * Note that in this case we also handle LIMIT here in a direct
335 * way, just getting the required range, as an optimization. */
337 zset
*zs
= sortval
->ptr
;
338 zskiplist
*zsl
= zs
->zsl
;
341 int rangelen
= vectorlen
;
343 /* Check if starting point is trivial, before doing log(N) lookup. */
345 long zsetlen
= dictSize(((zset
*)sortval
->ptr
)->dict
);
349 ln
= zslGetElementByRank(zsl
,zsetlen
-start
);
351 ln
= zsl
->header
->level
[0].forward
;
353 ln
= zslGetElementByRank(zsl
,start
+1);
357 redisAssertWithInfo(c
,sortval
,ln
!= NULL
);
360 vector
[j
].u
.score
= 0;
361 vector
[j
].u
.cmpobj
= NULL
;
363 ln
= desc
? ln
->backward
: ln
->level
[0].forward
;
365 /* The code producing the output does not know that in the case of
366 * sorted set, 'dontsort', and LIMIT, we are able to get just the
367 * range, already sorted, so we need to adjust "start" and "end"
368 * to make sure start is set to 0. */
371 } else if (sortval
->type
== REDIS_ZSET
) {
372 dict
*set
= ((zset
*)sortval
->ptr
)->dict
;
375 di
= dictGetIterator(set
);
376 while((setele
= dictNext(di
)) != NULL
) {
377 vector
[j
].obj
= dictGetKey(setele
);
378 vector
[j
].u
.score
= 0;
379 vector
[j
].u
.cmpobj
= NULL
;
382 dictReleaseIterator(di
);
384 redisPanic("Unknown type");
386 redisAssertWithInfo(c
,sortval
,j
== vectorlen
);
388 /* Now it's time to load the right scores in the sorting vector */
390 for (j
= 0; j
< vectorlen
; j
++) {
393 /* lookup value to sort by */
394 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
395 if (!byval
) continue;
397 /* use object itself to sort by */
398 byval
= vector
[j
].obj
;
402 if (sortby
) vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
404 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
407 vector
[j
].u
.score
= strtod(byval
->ptr
,&eptr
);
408 if (eptr
[0] != '\0' || errno
== ERANGE
||
409 isnan(vector
[j
].u
.score
))
411 int_convertion_error
= 1;
413 } else if (byval
->encoding
== REDIS_ENCODING_INT
) {
414 /* Don't need to decode the object if it's
415 * integer-encoded (the only encoding supported) so
416 * far. We can just cast it */
417 vector
[j
].u
.score
= (long)byval
->ptr
;
419 redisAssertWithInfo(c
,sortval
,1 != 1);
423 /* when the object was retrieved using lookupKeyByPattern,
424 * its refcount needs to be decreased. */
432 server
.sort_desc
= desc
;
433 server
.sort_alpha
= alpha
;
434 server
.sort_bypattern
= sortby
? 1 : 0;
435 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
436 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
438 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
441 /* Send command output to the output buffer, performing the specified
442 * GET/DEL/INCR/DECR operations if any. */
443 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
444 if (int_convertion_error
) {
445 addReplyError(c
,"One or more scores can't be converted into double");
446 } else if (storekey
== NULL
) {
447 /* STORE option not specified, sent the sorting result to client */
448 addReplyMultiBulkLen(c
,outputlen
);
449 for (j
= start
; j
<= end
; j
++) {
453 if (!getop
) addReplyBulk(c
,vector
[j
].obj
);
454 listRewind(operations
,&li
);
455 while((ln
= listNext(&li
))) {
456 redisSortOperation
*sop
= ln
->value
;
457 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
460 if (sop
->type
== REDIS_SORT_GET
) {
462 addReply(c
,shared
.nullbulk
);
469 redisAssertWithInfo(c
,sortval
,sop
->type
== REDIS_SORT_GET
);
474 robj
*sobj
= createZiplistObject();
476 /* STORE option specified, set the sorting result as a List object */
477 for (j
= start
; j
<= end
; j
++) {
482 listTypePush(sobj
,vector
[j
].obj
,REDIS_TAIL
);
484 listRewind(operations
,&li
);
485 while((ln
= listNext(&li
))) {
486 redisSortOperation
*sop
= ln
->value
;
487 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
490 if (sop
->type
== REDIS_SORT_GET
) {
491 if (!val
) val
= createStringObject("",0);
493 /* listTypePush does an incrRefCount, so we should take care
494 * care of the incremented refcount caused by either
495 * lookupKeyByPattern or createStringObject("",0) */
496 listTypePush(sobj
,val
,REDIS_TAIL
);
500 redisAssertWithInfo(c
,sortval
,sop
->type
== REDIS_SORT_GET
);
506 setKey(c
->db
,storekey
,sobj
);
507 server
.dirty
+= outputlen
;
508 } else if (dbDelete(c
->db
,storekey
)) {
509 signalModifiedKey(c
->db
,storekey
);
513 addReplyLongLong(c
,outputlen
);
517 if (sortval
->type
== REDIS_LIST
|| sortval
->type
== REDIS_SET
)
518 for (j
= 0; j
< vectorlen
; j
++)
519 decrRefCount(vector
[j
].obj
);
520 decrRefCount(sortval
);
521 listRelease(operations
);
522 for (j
= 0; j
< vectorlen
; j
++) {
523 if (alpha
&& vector
[j
].u
.cmpobj
)
524 decrRefCount(vector
[j
].u
.cmpobj
);