]> git.saurik.com Git - redis.git/blame_incremental - src/sort.c
Merge pull request #544 from dvirsky/2.6
[redis.git] / src / sort.c
... / ...
CommitLineData
1#include "redis.h"
2#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
3#include <math.h> /* isnan() */
4
5zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank);
6
7redisSortOperation *createSortOperation(int type, robj *pattern) {
8 redisSortOperation *so = zmalloc(sizeof(*so));
9 so->type = type;
10 so->pattern = pattern;
11 return so;
12}
13
14/* Return the value associated to the key with a name obtained using
15 * the following rules:
16 *
17 * 1) The first occurence of '*' in 'pattern' is substituted with 'subst'.
18 *
19 * 2) If 'pattern' matches the "->" string, everything on the left of
20 * the arrow is treated as the name of an hash field, and the part on the
21 * left as the key name containing an hash. The value of the specified
22 * field is returned.
23 *
24 * 3) If 'pattern' equals "#", the function simply returns 'subst' itself so
25 * that the SORT command can be used like: SORT key GET # to retrieve
26 * the Set/List elements directly.
27 *
28 * The returned object will always have its refcount increased by 1
29 * when it is non-NULL. */
30robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
31 char *p, *f, *k;
32 sds spat, ssub;
33 robj *keyobj, *fieldobj = NULL, *o;
34 int prefixlen, sublen, postfixlen, fieldlen;
35
36 /* If the pattern is "#" return the substitution object itself in order
37 * to implement the "SORT ... GET #" feature. */
38 spat = pattern->ptr;
39 if (spat[0] == '#' && spat[1] == '\0') {
40 incrRefCount(subst);
41 return subst;
42 }
43
44 /* The substitution object may be specially encoded. If so we create
45 * a decoded object on the fly. Otherwise getDecodedObject will just
46 * increment the ref count, that we'll decrement later. */
47 subst = getDecodedObject(subst);
48 ssub = subst->ptr;
49
50 /* If we can't find '*' in the pattern we return NULL as to GET a
51 * fixed key does not make sense. */
52 p = strchr(spat,'*');
53 if (!p) {
54 decrRefCount(subst);
55 return NULL;
56 }
57
58 /* Find out if we're dealing with a hash dereference. */
59 if ((f = strstr(p+1, "->")) != NULL && *(f+2) != '\0') {
60 fieldlen = sdslen(spat)-(f-spat)-2;
61 fieldobj = createStringObject(f+2,fieldlen);
62 } else {
63 fieldlen = 0;
64 }
65
66 /* Perform the '*' substitution. */
67 prefixlen = p-spat;
68 sublen = sdslen(ssub);
69 postfixlen = sdslen(spat)-(prefixlen+1)-(fieldlen ? fieldlen+2 : 0);
70 keyobj = createStringObject(NULL,prefixlen+sublen+postfixlen);
71 k = keyobj->ptr;
72 memcpy(k,spat,prefixlen);
73 memcpy(k+prefixlen,ssub,sublen);
74 memcpy(k+prefixlen+sublen,p+1,postfixlen);
75 decrRefCount(subst); /* Incremented by decodeObject() */
76
77 /* Lookup substituted key */
78 o = lookupKeyRead(db,keyobj);
79 if (o == NULL) goto noobj;
80
81 if (fieldobj) {
82 if (o->type != REDIS_HASH) goto noobj;
83
84 /* Retrieve value from hash by the field name. This operation
85 * already increases the refcount of the returned object. */
86 o = hashTypeGetObject(o, fieldobj);
87 } else {
88 if (o->type != REDIS_STRING) goto noobj;
89
90 /* Every object that this function returns needs to have its refcount
91 * increased. sortCommand decreases it again. */
92 incrRefCount(o);
93 }
94 decrRefCount(keyobj);
95 if (fieldobj) decrRefCount(fieldobj);
96 return o;
97
98noobj:
99 decrRefCount(keyobj);
100 if (fieldlen) decrRefCount(fieldobj);
101 return NULL;
102}
103
104/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
105 * the additional parameter is not standard but a BSD-specific we have to
106 * pass sorting parameters via the global 'server' structure */
107int sortCompare(const void *s1, const void *s2) {
108 const redisSortObject *so1 = s1, *so2 = s2;
109 int cmp;
110
111 if (!server.sort_alpha) {
112 /* Numeric sorting. Here it's trivial as we precomputed scores */
113 if (so1->u.score > so2->u.score) {
114 cmp = 1;
115 } else if (so1->u.score < so2->u.score) {
116 cmp = -1;
117 } else {
118 /* Objects have the same score, but we don't want the comparison
119 * to be undefined, so we compare objects lexicographycally.
120 * This way the result of SORT is deterministic. */
121 cmp = compareStringObjects(so1->obj,so2->obj);
122 }
123 } else {
124 /* Alphanumeric sorting */
125 if (server.sort_bypattern) {
126 if (!so1->u.cmpobj || !so2->u.cmpobj) {
127 /* At least one compare object is NULL */
128 if (so1->u.cmpobj == so2->u.cmpobj)
129 cmp = 0;
130 else if (so1->u.cmpobj == NULL)
131 cmp = -1;
132 else
133 cmp = 1;
134 } else {
135 /* We have both the objects, use strcoll */
136 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
137 }
138 } else {
139 /* Compare elements directly. */
140 cmp = compareStringObjects(so1->obj,so2->obj);
141 }
142 }
143 return server.sort_desc ? -cmp : cmp;
144}
145
146/* The SORT command is the most complex command in Redis. Warning: this code
147 * is optimized for speed and a bit less for readability */
148void sortCommand(redisClient *c) {
149 list *operations;
150 unsigned int outputlen = 0;
151 int desc = 0, alpha = 0;
152 long limit_start = 0, limit_count = -1, start, end;
153 int j, dontsort = 0, vectorlen;
154 int getop = 0; /* GET operation counter */
155 int int_convertion_error = 0;
156 robj *sortval, *sortby = NULL, *storekey = NULL;
157 redisSortObject *vector; /* Resulting vector to sort */
158
159 /* Lookup the key to sort. It must be of the right types */
160 sortval = lookupKeyRead(c->db,c->argv[1]);
161 if (sortval && sortval->type != REDIS_SET &&
162 sortval->type != REDIS_LIST &&
163 sortval->type != REDIS_ZSET)
164 {
165 addReply(c,shared.wrongtypeerr);
166 return;
167 }
168
169 /* Create a list of operations to perform for every sorted element.
170 * Operations can be GET/DEL/INCR/DECR */
171 operations = listCreate();
172 listSetFreeMethod(operations,zfree);
173 j = 2; /* options start at argv[2] */
174
175 /* Now we need to protect sortval incrementing its count, in the future
176 * SORT may have options able to overwrite/delete keys during the sorting
177 * and the sorted key itself may get destroied */
178 if (sortval)
179 incrRefCount(sortval);
180 else
181 sortval = createListObject();
182
183 /* The SORT command has an SQL-alike syntax, parse it */
184 while(j < c->argc) {
185 int leftargs = c->argc-j-1;
186 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
187 desc = 0;
188 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
189 desc = 1;
190 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
191 alpha = 1;
192 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
193 if ((getLongFromObjectOrReply(c, c->argv[j+1], &limit_start, NULL) != REDIS_OK) ||
194 (getLongFromObjectOrReply(c, c->argv[j+2], &limit_count, NULL) != REDIS_OK)) return;
195 j+=2;
196 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
197 storekey = c->argv[j+1];
198 j++;
199 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
200 sortby = c->argv[j+1];
201 /* If the BY pattern does not contain '*', i.e. it is constant,
202 * we don't need to sort nor to lookup the weight keys. */
203 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
204 j++;
205 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
206 listAddNodeTail(operations,createSortOperation(
207 REDIS_SORT_GET,c->argv[j+1]));
208 getop++;
209 j++;
210 } else {
211 decrRefCount(sortval);
212 listRelease(operations);
213 addReply(c,shared.syntaxerr);
214 return;
215 }
216 j++;
217 }
218
219 /* For the STORE option, or when SORT is called from a Lua script,
220 * we want to force a specific ordering even when no explicit ordering
221 * was asked (SORT BY nosort). This guarantees that replication / AOF
222 * is deterministic.
223 *
224 * However in the case 'dontsort' is true, but the type to sort is a
225 * sorted set, we don't need to do anything as ordering is guaranteed
226 * in this special case. */
227 if ((storekey || c->flags & REDIS_LUA_CLIENT) &&
228 (dontsort && sortval->type != REDIS_ZSET))
229 {
230 /* Force ALPHA sorting */
231 dontsort = 0;
232 alpha = 1;
233 sortby = NULL;
234 }
235
236 /* Destructively convert encoded sorted sets for SORT. */
237 if (sortval->type == REDIS_ZSET)
238 zsetConvert(sortval, REDIS_ENCODING_SKIPLIST);
239
240 /* Objtain the length of the object to sort. */
241 switch(sortval->type) {
242 case REDIS_LIST: vectorlen = listTypeLength(sortval); break;
243 case REDIS_SET: vectorlen = setTypeSize(sortval); break;
244 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
245 default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */
246 }
247
248 /* Perform LIMIT start,count sanity checking. */
249 start = (limit_start < 0) ? 0 : limit_start;
250 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
251 if (start >= vectorlen) {
252 start = vectorlen-1;
253 end = vectorlen-2;
254 }
255 if (end >= vectorlen) end = vectorlen-1;
256
257 /* Optimization:
258 *
259 * 1) if the object to sort is a sorted set.
260 * 2) There is nothing to sort as dontsort is true (BY <constant string>).
261 * 3) We have a LIMIT option that actually reduces the number of elements
262 * to fetch.
263 *
264 * In this case to load all the objects in the vector is a huge waste of
265 * resources. We just allocate a vector that is big enough for the selected
266 * range length, and make sure to load just this part in the vector. */
267 if (sortval->type == REDIS_ZSET &&
268 dontsort &&
269 (start != 0 || end != vectorlen-1))
270 {
271 vectorlen = end-start+1;
272 }
273
274 /* Load the sorting vector with all the objects to sort */
275 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
276 j = 0;
277
278 if (sortval->type == REDIS_LIST) {
279 listTypeIterator *li = listTypeInitIterator(sortval,0,REDIS_TAIL);
280 listTypeEntry entry;
281 while(listTypeNext(li,&entry)) {
282 vector[j].obj = listTypeGet(&entry);
283 vector[j].u.score = 0;
284 vector[j].u.cmpobj = NULL;
285 j++;
286 }
287 listTypeReleaseIterator(li);
288 } else if (sortval->type == REDIS_SET) {
289 setTypeIterator *si = setTypeInitIterator(sortval);
290 robj *ele;
291 while((ele = setTypeNextObject(si)) != NULL) {
292 vector[j].obj = ele;
293 vector[j].u.score = 0;
294 vector[j].u.cmpobj = NULL;
295 j++;
296 }
297 setTypeReleaseIterator(si);
298 } else if (sortval->type == REDIS_ZSET && dontsort) {
299 /* Special handling for a sorted set, if 'dontsort' is true.
300 * This makes sure we return elements in the sorted set original
301 * ordering, accordingly to DESC / ASC options.
302 *
303 * Note that in this case we also handle LIMIT here in a direct
304 * way, just getting the required range, as an optimization. */
305
306 zset *zs = sortval->ptr;
307 zskiplist *zsl = zs->zsl;
308 zskiplistNode *ln;
309 robj *ele;
310 int rangelen = vectorlen;
311
312 /* Check if starting point is trivial, before doing log(N) lookup. */
313 if (desc) {
314 long zsetlen = dictSize(((zset*)sortval->ptr)->dict);
315
316 ln = zsl->tail;
317 if (start > 0)
318 ln = zslGetElementByRank(zsl,zsetlen-start);
319 } else {
320 ln = zsl->header->level[0].forward;
321 if (start > 0)
322 ln = zslGetElementByRank(zsl,start+1);
323 }
324
325 while(rangelen--) {
326 redisAssertWithInfo(c,sortval,ln != NULL);
327 ele = ln->obj;
328 vector[j].obj = ele;
329 vector[j].u.score = 0;
330 vector[j].u.cmpobj = NULL;
331 j++;
332 ln = desc ? ln->backward : ln->level[0].forward;
333 }
334 /* The code producing the output does not know that in the case of
335 * sorted set, 'dontsort', and LIMIT, we are able to get just the
336 * range, already sorted, so we need to adjust "start" and "end"
337 * to make sure start is set to 0. */
338 end -= start;
339 start = 0;
340 } else if (sortval->type == REDIS_ZSET) {
341 dict *set = ((zset*)sortval->ptr)->dict;
342 dictIterator *di;
343 dictEntry *setele;
344 di = dictGetIterator(set);
345 while((setele = dictNext(di)) != NULL) {
346 vector[j].obj = dictGetKey(setele);
347 vector[j].u.score = 0;
348 vector[j].u.cmpobj = NULL;
349 j++;
350 }
351 dictReleaseIterator(di);
352 } else {
353 redisPanic("Unknown type");
354 }
355 redisAssertWithInfo(c,sortval,j == vectorlen);
356
357 /* Now it's time to load the right scores in the sorting vector */
358 if (dontsort == 0) {
359 for (j = 0; j < vectorlen; j++) {
360 robj *byval;
361 if (sortby) {
362 /* lookup value to sort by */
363 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
364 if (!byval) continue;
365 } else {
366 /* use object itself to sort by */
367 byval = vector[j].obj;
368 }
369
370 if (alpha) {
371 if (sortby) vector[j].u.cmpobj = getDecodedObject(byval);
372 } else {
373 if (byval->encoding == REDIS_ENCODING_RAW) {
374 char *eptr;
375
376 vector[j].u.score = strtod(byval->ptr,&eptr);
377 if (eptr[0] != '\0' || errno == ERANGE ||
378 isnan(vector[j].u.score))
379 {
380 int_convertion_error = 1;
381 }
382 } else if (byval->encoding == REDIS_ENCODING_INT) {
383 /* Don't need to decode the object if it's
384 * integer-encoded (the only encoding supported) so
385 * far. We can just cast it */
386 vector[j].u.score = (long)byval->ptr;
387 } else {
388 redisAssertWithInfo(c,sortval,1 != 1);
389 }
390 }
391
392 /* when the object was retrieved using lookupKeyByPattern,
393 * its refcount needs to be decreased. */
394 if (sortby) {
395 decrRefCount(byval);
396 }
397 }
398 }
399
400 if (dontsort == 0) {
401 server.sort_desc = desc;
402 server.sort_alpha = alpha;
403 server.sort_bypattern = sortby ? 1 : 0;
404 if (sortby && (start != 0 || end != vectorlen-1))
405 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
406 else
407 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
408 }
409
410 /* Send command output to the output buffer, performing the specified
411 * GET/DEL/INCR/DECR operations if any. */
412 outputlen = getop ? getop*(end-start+1) : end-start+1;
413 if (int_convertion_error) {
414 addReplyError(c,"One or more scores can't be converted into double");
415 } else if (storekey == NULL) {
416 /* STORE option not specified, sent the sorting result to client */
417 addReplyMultiBulkLen(c,outputlen);
418 for (j = start; j <= end; j++) {
419 listNode *ln;
420 listIter li;
421
422 if (!getop) addReplyBulk(c,vector[j].obj);
423 listRewind(operations,&li);
424 while((ln = listNext(&li))) {
425 redisSortOperation *sop = ln->value;
426 robj *val = lookupKeyByPattern(c->db,sop->pattern,
427 vector[j].obj);
428
429 if (sop->type == REDIS_SORT_GET) {
430 if (!val) {
431 addReply(c,shared.nullbulk);
432 } else {
433 addReplyBulk(c,val);
434 decrRefCount(val);
435 }
436 } else {
437 /* Always fails */
438 redisAssertWithInfo(c,sortval,sop->type == REDIS_SORT_GET);
439 }
440 }
441 }
442 } else {
443 robj *sobj = createZiplistObject();
444
445 /* STORE option specified, set the sorting result as a List object */
446 for (j = start; j <= end; j++) {
447 listNode *ln;
448 listIter li;
449
450 if (!getop) {
451 listTypePush(sobj,vector[j].obj,REDIS_TAIL);
452 } else {
453 listRewind(operations,&li);
454 while((ln = listNext(&li))) {
455 redisSortOperation *sop = ln->value;
456 robj *val = lookupKeyByPattern(c->db,sop->pattern,
457 vector[j].obj);
458
459 if (sop->type == REDIS_SORT_GET) {
460 if (!val) val = createStringObject("",0);
461
462 /* listTypePush does an incrRefCount, so we should take care
463 * care of the incremented refcount caused by either
464 * lookupKeyByPattern or createStringObject("",0) */
465 listTypePush(sobj,val,REDIS_TAIL);
466 decrRefCount(val);
467 } else {
468 /* Always fails */
469 redisAssertWithInfo(c,sortval,sop->type == REDIS_SORT_GET);
470 }
471 }
472 }
473 }
474 if (outputlen) {
475 setKey(c->db,storekey,sobj);
476 server.dirty += outputlen;
477 } else if (dbDelete(c->db,storekey)) {
478 signalModifiedKey(c->db,storekey);
479 server.dirty++;
480 }
481 decrRefCount(sobj);
482 addReplyLongLong(c,outputlen);
483 }
484
485 /* Cleanup */
486 if (sortval->type == REDIS_LIST || sortval->type == REDIS_SET)
487 for (j = 0; j < vectorlen; j++)
488 decrRefCount(vector[j].obj);
489 decrRefCount(sortval);
490 listRelease(operations);
491 for (j = 0; j < vectorlen; j++) {
492 if (alpha && vector[j].u.cmpobj)
493 decrRefCount(vector[j].u.cmpobj);
494 }
495 zfree(vector);
496}
497
498