]>
git.saurik.com Git - redis.git/blob - src/sort.c
2 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
3 #include <math.h> /* isnan() */
5 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
6 redisSortOperation
*so
= zmalloc(sizeof(*so
));
12 /* Return the value associated to the key with a name obtained using
13 * the following rules:
15 * 1) The first occurence of '*' in 'pattern' is substituted with 'subst'.
17 * 2) If 'pattern' matches the "->" string, everything on the left of
18 * the arrow is treated as the name of an hash field, and the part on the
19 * left as the key name containing an hash. The value of the specified
22 * 3) If 'pattern' equals "#", the function simply returns 'subst' itself so
23 * that the SORT command can be used like: SORT key GET # to retrieve
24 * the Set/List elements directly.
26 * The returned object will always have its refcount increased by 1
27 * when it is non-NULL. */
28 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
31 robj
*keyobj
, *fieldobj
, *o
;
32 int prefixlen
, sublen
, postfixlen
, fieldlen
;
34 /* If the pattern is "#" return the substitution object itself in order
35 * to implement the "SORT ... GET #" feature. */
37 if (spat
[0] == '#' && spat
[1] == '\0') {
42 /* The substitution object may be specially encoded. If so we create
43 * a decoded object on the fly. Otherwise getDecodedObject will just
44 * increment the ref count, that we'll decrement later. */
45 subst
= getDecodedObject(subst
);
48 /* If we can't find '*' in the pattern we return NULL as to GET a
49 * fixed key does not make sense. */
56 /* Find out if we're dealing with a hash dereference. */
57 if ((f
= strstr(p
+1, "->")) != NULL
&& *(f
+2) != '\0') {
58 fieldlen
= sdslen(spat
)-(f
-spat
)-2;
59 fieldobj
= createStringObject(f
+2,fieldlen
);
64 /* Perform the '*' substitution. */
66 sublen
= sdslen(ssub
);
67 postfixlen
= sdslen(spat
)-(prefixlen
+1)-(fieldlen
? fieldlen
+2 : 0);
68 keyobj
= createStringObject(NULL
,prefixlen
+sublen
+postfixlen
);
70 memcpy(k
,spat
,prefixlen
);
71 memcpy(k
+prefixlen
,ssub
,sublen
);
72 memcpy(k
+prefixlen
+sublen
,p
+1,postfixlen
);
73 decrRefCount(subst
); /* Incremented by decodeObject() */
75 /* Lookup substituted key */
76 o
= lookupKeyRead(db
,keyobj
);
77 if (o
== NULL
) goto noobj
;
80 if (o
->type
!= REDIS_HASH
) goto noobj
;
82 /* Retrieve value from hash by the field name. This operation
83 * already increases the refcount of the returned object. */
84 o
= hashTypeGetObject(o
, fieldobj
);
86 if (o
->type
!= REDIS_STRING
) goto noobj
;
88 /* Every object that this function returns needs to have its refcount
89 * increased. sortCommand decreases it again. */
93 if (fieldlen
) decrRefCount(fieldobj
);
98 if (fieldlen
) decrRefCount(fieldobj
);
102 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
103 * the additional parameter is not standard but a BSD-specific we have to
104 * pass sorting parameters via the global 'server' structure */
105 int sortCompare(const void *s1
, const void *s2
) {
106 const redisSortObject
*so1
= s1
, *so2
= s2
;
109 if (!server
.sort_alpha
) {
110 /* Numeric sorting. Here it's trivial as we precomputed scores */
111 if (so1
->u
.score
> so2
->u
.score
) {
113 } else if (so1
->u
.score
< so2
->u
.score
) {
116 /* Objects have the same score, but we don't want the comparison
117 * to be undefined, so we compare objects lexicographycally.
118 * This way the result of SORT is deterministic. */
119 cmp
= compareStringObjects(so1
->obj
,so2
->obj
);
122 /* Alphanumeric sorting */
123 if (server
.sort_bypattern
) {
124 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
125 /* At least one compare object is NULL */
126 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
128 else if (so1
->u
.cmpobj
== NULL
)
133 /* We have both the objects, use strcoll */
134 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
137 /* Compare elements directly. */
138 cmp
= compareStringObjects(so1
->obj
,so2
->obj
);
141 return server
.sort_desc
? -cmp
: cmp
;
144 /* The SORT command is the most complex command in Redis. Warning: this code
145 * is optimized for speed and a bit less for readability */
146 void sortCommand(redisClient
*c
) {
148 unsigned int outputlen
= 0;
149 int desc
= 0, alpha
= 0;
150 long limit_start
= 0, limit_count
= -1, start
, end
;
151 int j
, dontsort
= 0, vectorlen
;
152 int getop
= 0; /* GET operation counter */
153 int int_convertion_error
= 0;
154 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
155 redisSortObject
*vector
; /* Resulting vector to sort */
157 /* Lookup the key to sort. It must be of the right types */
158 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
159 if (sortval
&& sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
160 sortval
->type
!= REDIS_ZSET
)
162 addReply(c
,shared
.wrongtypeerr
);
166 /* Create a list of operations to perform for every sorted element.
167 * Operations can be GET/DEL/INCR/DECR */
168 operations
= listCreate();
169 listSetFreeMethod(operations
,zfree
);
172 /* Now we need to protect sortval incrementing its count, in the future
173 * SORT may have options able to overwrite/delete keys during the sorting
174 * and the sorted key itself may get destroied */
176 incrRefCount(sortval
);
178 sortval
= createListObject();
180 /* The SORT command has an SQL-alike syntax, parse it */
182 int leftargs
= c
->argc
-j
-1;
183 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
185 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
187 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
189 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
190 if ((getLongFromObjectOrReply(c
, c
->argv
[j
+1], &limit_start
, NULL
) != REDIS_OK
) ||
191 (getLongFromObjectOrReply(c
, c
->argv
[j
+2], &limit_count
, NULL
) != REDIS_OK
)) return;
193 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
194 storekey
= c
->argv
[j
+1];
196 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
197 sortby
= c
->argv
[j
+1];
198 /* If the BY pattern does not contain '*', i.e. it is constant,
199 * we don't need to sort nor to lookup the weight keys. */
200 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
202 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
203 listAddNodeTail(operations
,createSortOperation(
204 REDIS_SORT_GET
,c
->argv
[j
+1]));
208 decrRefCount(sortval
);
209 listRelease(operations
);
210 addReply(c
,shared
.syntaxerr
);
216 /* If we have STORE we need to force sorting for deterministic output
217 * and replication. We use alpha sorting since this is guaranteed to
218 * work with any input. */
219 if (storekey
&& dontsort
) {
225 /* Destructively convert encoded sorted sets for SORT. */
226 if (sortval
->type
== REDIS_ZSET
)
227 zsetConvert(sortval
, REDIS_ENCODING_SKIPLIST
);
229 /* Load the sorting vector with all the objects to sort */
230 switch(sortval
->type
) {
231 case REDIS_LIST
: vectorlen
= listTypeLength(sortval
); break;
232 case REDIS_SET
: vectorlen
= setTypeSize(sortval
); break;
233 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
234 default: vectorlen
= 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */
236 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
239 if (sortval
->type
== REDIS_LIST
) {
240 listTypeIterator
*li
= listTypeInitIterator(sortval
,0,REDIS_TAIL
);
242 while(listTypeNext(li
,&entry
)) {
243 vector
[j
].obj
= listTypeGet(&entry
);
244 vector
[j
].u
.score
= 0;
245 vector
[j
].u
.cmpobj
= NULL
;
248 listTypeReleaseIterator(li
);
249 } else if (sortval
->type
== REDIS_SET
) {
250 setTypeIterator
*si
= setTypeInitIterator(sortval
);
252 while((ele
= setTypeNextObject(si
)) != NULL
) {
254 vector
[j
].u
.score
= 0;
255 vector
[j
].u
.cmpobj
= NULL
;
258 setTypeReleaseIterator(si
);
259 } else if (sortval
->type
== REDIS_ZSET
) {
260 dict
*set
= ((zset
*)sortval
->ptr
)->dict
;
263 di
= dictGetIterator(set
);
264 while((setele
= dictNext(di
)) != NULL
) {
265 vector
[j
].obj
= dictGetKey(setele
);
266 vector
[j
].u
.score
= 0;
267 vector
[j
].u
.cmpobj
= NULL
;
270 dictReleaseIterator(di
);
272 redisPanic("Unknown type");
274 redisAssertWithInfo(c
,sortval
,j
== vectorlen
);
276 /* Now it's time to load the right scores in the sorting vector */
278 for (j
= 0; j
< vectorlen
; j
++) {
281 /* lookup value to sort by */
282 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
283 if (!byval
) continue;
285 /* use object itself to sort by */
286 byval
= vector
[j
].obj
;
290 if (sortby
) vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
292 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
295 vector
[j
].u
.score
= strtod(byval
->ptr
,&eptr
);
296 if (eptr
[0] != '\0' || errno
== ERANGE
||
297 isnan(vector
[j
].u
.score
))
299 int_convertion_error
= 1;
301 } else if (byval
->encoding
== REDIS_ENCODING_INT
) {
302 /* Don't need to decode the object if it's
303 * integer-encoded (the only encoding supported) so
304 * far. We can just cast it */
305 vector
[j
].u
.score
= (long)byval
->ptr
;
307 redisAssertWithInfo(c
,sortval
,1 != 1);
311 /* when the object was retrieved using lookupKeyByPattern,
312 * its refcount needs to be decreased. */
319 /* We are ready to sort the vector... perform a bit of sanity check
320 * on the LIMIT option too. We'll use a partial version of quicksort. */
321 start
= (limit_start
< 0) ? 0 : limit_start
;
322 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
323 if (start
>= vectorlen
) {
327 if (end
>= vectorlen
) end
= vectorlen
-1;
329 server
.sort_dontsort
= dontsort
;
331 server
.sort_desc
= desc
;
332 server
.sort_alpha
= alpha
;
333 server
.sort_bypattern
= sortby
? 1 : 0;
334 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
335 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
337 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
340 /* Send command output to the output buffer, performing the specified
341 * GET/DEL/INCR/DECR operations if any. */
342 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
343 if (int_convertion_error
) {
344 addReplyError(c
,"One or more scores can't be converted into double");
345 } else if (storekey
== NULL
) {
346 /* STORE option not specified, sent the sorting result to client */
347 addReplyMultiBulkLen(c
,outputlen
);
348 for (j
= start
; j
<= end
; j
++) {
352 if (!getop
) addReplyBulk(c
,vector
[j
].obj
);
353 listRewind(operations
,&li
);
354 while((ln
= listNext(&li
))) {
355 redisSortOperation
*sop
= ln
->value
;
356 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
359 if (sop
->type
== REDIS_SORT_GET
) {
361 addReply(c
,shared
.nullbulk
);
368 redisAssertWithInfo(c
,sortval
,sop
->type
== REDIS_SORT_GET
);
373 robj
*sobj
= createZiplistObject();
375 /* STORE option specified, set the sorting result as a List object */
376 for (j
= start
; j
<= end
; j
++) {
381 listTypePush(sobj
,vector
[j
].obj
,REDIS_TAIL
);
383 listRewind(operations
,&li
);
384 while((ln
= listNext(&li
))) {
385 redisSortOperation
*sop
= ln
->value
;
386 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
389 if (sop
->type
== REDIS_SORT_GET
) {
390 if (!val
) val
= createStringObject("",0);
392 /* listTypePush does an incrRefCount, so we should take care
393 * care of the incremented refcount caused by either
394 * lookupKeyByPattern or createStringObject("",0) */
395 listTypePush(sobj
,val
,REDIS_TAIL
);
399 redisAssertWithInfo(c
,sortval
,sop
->type
== REDIS_SORT_GET
);
405 setKey(c
->db
,storekey
,sobj
);
406 server
.dirty
+= outputlen
;
407 } else if (dbDelete(c
->db
,storekey
)) {
408 signalModifiedKey(c
->db
,storekey
);
412 addReplyLongLong(c
,outputlen
);
416 if (sortval
->type
== REDIS_LIST
|| sortval
->type
== REDIS_SET
)
417 for (j
= 0; j
< vectorlen
; j
++)
418 decrRefCount(vector
[j
].obj
);
419 decrRefCount(sortval
);
420 listRelease(operations
);
421 for (j
= 0; j
< vectorlen
; j
++) {
422 if (alpha
&& vector
[j
].u
.cmpobj
)
423 decrRefCount(vector
[j
].u
.cmpobj
);