]>
git.saurik.com Git - redis.git/blob - src/sort.c
1a3fecb6f742483fc262ed1edd81a488676aa524
2 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
4 redisSortOperation
*createSortOperation(int type
, robj
*pattern
) {
5 redisSortOperation
*so
= zmalloc(sizeof(*so
));
11 /* Return the value associated to the key with a name obtained
12 * substituting the first occurence of '*' in 'pattern' with 'subst'.
13 * The returned object will always have its refcount increased by 1
14 * when it is non-NULL. */
15 robj
*lookupKeyByPattern(redisDb
*db
, robj
*pattern
, robj
*subst
) {
18 robj keyobj
, fieldobj
, *o
;
19 int prefixlen
, sublen
, postfixlen
, fieldlen
;
20 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
24 char buf
[REDIS_SORTKEY_MAX
+1];
27 /* If the pattern is "#" return the substitution object itself in order
28 * to implement the "SORT ... GET #" feature. */
30 if (spat
[0] == '#' && spat
[1] == '\0') {
35 /* The substitution object may be specially encoded. If so we create
36 * a decoded object on the fly. Otherwise getDecodedObject will just
37 * increment the ref count, that we'll decrement later. */
38 subst
= getDecodedObject(subst
);
41 if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
;
48 /* Find out if we're dealing with a hash dereference. */
49 if ((f
= strstr(p
+1, "->")) != NULL
) {
50 fieldlen
= sdslen(spat
)-(f
-spat
);
51 /* this also copies \0 character */
52 memcpy(fieldname
.buf
,f
+2,fieldlen
-1);
53 fieldname
.len
= fieldlen
-2;
59 sublen
= sdslen(ssub
);
60 postfixlen
= sdslen(spat
)-(prefixlen
+1)-fieldlen
;
61 memcpy(keyname
.buf
,spat
,prefixlen
);
62 memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
);
63 memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
);
64 keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0';
65 keyname
.len
= prefixlen
+sublen
+postfixlen
;
68 /* Lookup substituted key */
69 initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(struct sdshdr
)));
70 o
= lookupKeyRead(db
,&keyobj
);
71 if (o
== NULL
) return NULL
;
74 if (o
->type
!= REDIS_HASH
|| fieldname
.len
< 1) return NULL
;
76 /* Retrieve value from hash by the field name. This operation
77 * already increases the refcount of the returned object. */
78 initStaticStringObject(fieldobj
,((char*)&fieldname
)+(sizeof(struct sdshdr
)));
79 o
= hashTypeGetObject(o
, &fieldobj
);
81 if (o
->type
!= REDIS_STRING
) return NULL
;
83 /* Every object that this function returns needs to have its refcount
84 * increased. sortCommand decreases it again. */
91 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
92 * the additional parameter is not standard but a BSD-specific we have to
93 * pass sorting parameters via the global 'server' structure */
94 int sortCompare(const void *s1
, const void *s2
) {
95 const redisSortObject
*so1
= s1
, *so2
= s2
;
98 if (!server
.sort_alpha
) {
99 /* Numeric sorting. Here it's trivial as we precomputed scores */
100 if (so1
->u
.score
> so2
->u
.score
) {
102 } else if (so1
->u
.score
< so2
->u
.score
) {
108 /* Alphanumeric sorting */
109 if (server
.sort_bypattern
) {
110 if (!so1
->u
.cmpobj
|| !so2
->u
.cmpobj
) {
111 /* At least one compare object is NULL */
112 if (so1
->u
.cmpobj
== so2
->u
.cmpobj
)
114 else if (so1
->u
.cmpobj
== NULL
)
119 /* We have both the objects, use strcoll */
120 cmp
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
);
123 /* Compare elements directly. */
124 cmp
= compareStringObjects(so1
->obj
,so2
->obj
);
127 return server
.sort_desc
? -cmp
: cmp
;
130 /* The SORT command is the most complex command in Redis. Warning: this code
131 * is optimized for speed and a bit less for readability */
132 void sortCommand(redisClient
*c
) {
134 unsigned int outputlen
= 0;
135 int desc
= 0, alpha
= 0;
136 int limit_start
= 0, limit_count
= -1, start
, end
;
137 int j
, dontsort
= 0, vectorlen
;
138 int getop
= 0; /* GET operation counter */
139 robj
*sortval
, *sortby
= NULL
, *storekey
= NULL
;
140 redisSortObject
*vector
; /* Resulting vector to sort */
142 /* Lookup the key to sort. It must be of the right types */
143 sortval
= lookupKeyRead(c
->db
,c
->argv
[1]);
144 if (sortval
== NULL
) {
145 addReply(c
,shared
.emptymultibulk
);
148 if (sortval
->type
!= REDIS_SET
&& sortval
->type
!= REDIS_LIST
&&
149 sortval
->type
!= REDIS_ZSET
)
151 addReply(c
,shared
.wrongtypeerr
);
155 /* Create a list of operations to perform for every sorted element.
156 * Operations can be GET/DEL/INCR/DECR */
157 operations
= listCreate();
158 listSetFreeMethod(operations
,zfree
);
161 /* Now we need to protect sortval incrementing its count, in the future
162 * SORT may have options able to overwrite/delete keys during the sorting
163 * and the sorted key itself may get destroied */
164 incrRefCount(sortval
);
166 /* The SORT command has an SQL-alike syntax, parse it */
168 int leftargs
= c
->argc
-j
-1;
169 if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) {
171 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) {
173 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) {
175 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs
>= 2) {
176 limit_start
= atoi(c
->argv
[j
+1]->ptr
);
177 limit_count
= atoi(c
->argv
[j
+2]->ptr
);
179 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs
>= 1) {
180 storekey
= c
->argv
[j
+1];
182 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs
>= 1) {
183 sortby
= c
->argv
[j
+1];
184 /* If the BY pattern does not contain '*', i.e. it is constant,
185 * we don't need to sort nor to lookup the weight keys. */
186 if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort
= 1;
188 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs
>= 1) {
189 listAddNodeTail(operations
,createSortOperation(
190 REDIS_SORT_GET
,c
->argv
[j
+1]));
194 decrRefCount(sortval
);
195 listRelease(operations
);
196 addReply(c
,shared
.syntaxerr
);
202 /* Destructively convert encoded sorted sets for SORT. */
203 if (sortval
->type
== REDIS_ZSET
) zsetConvert(sortval
, REDIS_ENCODING_RAW
);
205 /* Load the sorting vector with all the objects to sort */
206 switch(sortval
->type
) {
207 case REDIS_LIST
: vectorlen
= listTypeLength(sortval
); break;
208 case REDIS_SET
: vectorlen
= setTypeSize(sortval
); break;
209 case REDIS_ZSET
: vectorlen
= dictSize(((zset
*)sortval
->ptr
)->dict
); break;
210 default: vectorlen
= 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */
212 vector
= zmalloc(sizeof(redisSortObject
)*vectorlen
);
215 if (sortval
->type
== REDIS_LIST
) {
216 listTypeIterator
*li
= listTypeInitIterator(sortval
,0,REDIS_TAIL
);
218 while(listTypeNext(li
,&entry
)) {
219 vector
[j
].obj
= listTypeGet(&entry
);
220 vector
[j
].u
.score
= 0;
221 vector
[j
].u
.cmpobj
= NULL
;
224 listTypeReleaseIterator(li
);
225 } else if (sortval
->type
== REDIS_SET
) {
226 setTypeIterator
*si
= setTypeInitIterator(sortval
);
228 while((ele
= setTypeNextObject(si
)) != NULL
) {
230 vector
[j
].u
.score
= 0;
231 vector
[j
].u
.cmpobj
= NULL
;
234 setTypeReleaseIterator(si
);
235 } else if (sortval
->type
== REDIS_ZSET
) {
236 dict
*set
= ((zset
*)sortval
->ptr
)->dict
;
239 di
= dictGetIterator(set
);
240 while((setele
= dictNext(di
)) != NULL
) {
241 vector
[j
].obj
= dictGetEntryKey(setele
);
242 vector
[j
].u
.score
= 0;
243 vector
[j
].u
.cmpobj
= NULL
;
246 dictReleaseIterator(di
);
248 redisPanic("Unknown type");
250 redisAssert(j
== vectorlen
);
252 /* Now it's time to load the right scores in the sorting vector */
254 for (j
= 0; j
< vectorlen
; j
++) {
257 /* lookup value to sort by */
258 byval
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
);
259 if (!byval
) continue;
261 /* use object itself to sort by */
262 byval
= vector
[j
].obj
;
266 if (sortby
) vector
[j
].u
.cmpobj
= getDecodedObject(byval
);
268 if (byval
->encoding
== REDIS_ENCODING_RAW
) {
269 vector
[j
].u
.score
= strtod(byval
->ptr
,NULL
);
270 } else if (byval
->encoding
== REDIS_ENCODING_INT
) {
271 /* Don't need to decode the object if it's
272 * integer-encoded (the only encoding supported) so
273 * far. We can just cast it */
274 vector
[j
].u
.score
= (long)byval
->ptr
;
280 /* when the object was retrieved using lookupKeyByPattern,
281 * its refcount needs to be decreased. */
288 /* We are ready to sort the vector... perform a bit of sanity check
289 * on the LIMIT option too. We'll use a partial version of quicksort. */
290 start
= (limit_start
< 0) ? 0 : limit_start
;
291 end
= (limit_count
< 0) ? vectorlen
-1 : start
+limit_count
-1;
292 if (start
>= vectorlen
) {
296 if (end
>= vectorlen
) end
= vectorlen
-1;
299 server
.sort_desc
= desc
;
300 server
.sort_alpha
= alpha
;
301 server
.sort_bypattern
= sortby
? 1 : 0;
302 if (sortby
&& (start
!= 0 || end
!= vectorlen
-1))
303 pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
);
305 qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
);
308 /* Send command output to the output buffer, performing the specified
309 * GET/DEL/INCR/DECR operations if any. */
310 outputlen
= getop
? getop
*(end
-start
+1) : end
-start
+1;
311 if (storekey
== NULL
) {
312 /* STORE option not specified, sent the sorting result to client */
313 addReplyMultiBulkLen(c
,outputlen
);
314 for (j
= start
; j
<= end
; j
++) {
318 if (!getop
) addReplyBulk(c
,vector
[j
].obj
);
319 listRewind(operations
,&li
);
320 while((ln
= listNext(&li
))) {
321 redisSortOperation
*sop
= ln
->value
;
322 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
325 if (sop
->type
== REDIS_SORT_GET
) {
327 addReply(c
,shared
.nullbulk
);
333 redisAssert(sop
->type
== REDIS_SORT_GET
); /* always fails */
338 robj
*sobj
= createZiplistObject();
340 /* STORE option specified, set the sorting result as a List object */
341 for (j
= start
; j
<= end
; j
++) {
346 listTypePush(sobj
,vector
[j
].obj
,REDIS_TAIL
);
348 listRewind(operations
,&li
);
349 while((ln
= listNext(&li
))) {
350 redisSortOperation
*sop
= ln
->value
;
351 robj
*val
= lookupKeyByPattern(c
->db
,sop
->pattern
,
354 if (sop
->type
== REDIS_SORT_GET
) {
355 if (!val
) val
= createStringObject("",0);
357 /* listTypePush does an incrRefCount, so we should take care
358 * care of the incremented refcount caused by either
359 * lookupKeyByPattern or createStringObject("",0) */
360 listTypePush(sobj
,val
,REDIS_TAIL
);
364 redisAssert(sop
->type
== REDIS_SORT_GET
);
369 dbReplace(c
->db
,storekey
,sobj
);
370 /* Note: we add 1 because the DB is dirty anyway since even if the
371 * SORT result is empty a new key is set and maybe the old content
373 server
.dirty
+= 1+outputlen
;
374 signalModifiedKey(c
->db
,storekey
);
375 addReplyLongLong(c
,outputlen
);
379 if (sortval
->type
== REDIS_LIST
|| sortval
->type
== REDIS_SET
)
380 for (j
= 0; j
< vectorlen
; j
++)
381 decrRefCount(vector
[j
].obj
);
382 decrRefCount(sortval
);
383 listRelease(operations
);
384 for (j
= 0; j
< vectorlen
; j
++) {
385 if (alpha
&& vector
[j
].u
.cmpobj
)
386 decrRefCount(vector
[j
].u
.cmpobj
);