]> git.saurik.com Git - redis.git/blob - src/sort.c
11b73ad389d69f4482d106885875ff7be71d6068
[redis.git] / src / sort.c
1 #include "redis.h"
2 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
3 #include <math.h> /* isnan() */
4
5 redisSortOperation *createSortOperation(int type, robj *pattern) {
6 redisSortOperation *so = zmalloc(sizeof(*so));
7 so->type = type;
8 so->pattern = pattern;
9 return so;
10 }
11
12 /* Return the value associated to the key with a name obtained
13 * substituting the first occurence of '*' in 'pattern' with 'subst'.
14 * The returned object will always have its refcount increased by 1
15 * when it is non-NULL. */
16 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
17 char *p, *f;
18 sds spat, ssub;
19 robj keyobj, fieldobj, *o;
20 int prefixlen, sublen, postfixlen, fieldlen;
21 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
22 struct {
23 int len;
24 int free;
25 char buf[REDIS_SORTKEY_MAX+1];
26 } keyname, fieldname;
27
28 /* If the pattern is "#" return the substitution object itself in order
29 * to implement the "SORT ... GET #" feature. */
30 spat = pattern->ptr;
31 if (spat[0] == '#' && spat[1] == '\0') {
32 incrRefCount(subst);
33 return subst;
34 }
35
36 /* The substitution object may be specially encoded. If so we create
37 * a decoded object on the fly. Otherwise getDecodedObject will just
38 * increment the ref count, that we'll decrement later. */
39 subst = getDecodedObject(subst);
40
41 ssub = subst->ptr;
42 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
43 p = strchr(spat,'*');
44 if (!p) {
45 decrRefCount(subst);
46 return NULL;
47 }
48
49 /* Find out if we're dealing with a hash dereference. */
50 if ((f = strstr(p+1, "->")) != NULL) {
51 fieldlen = sdslen(spat)-(f-spat);
52 /* this also copies \0 character */
53 memcpy(fieldname.buf,f+2,fieldlen-1);
54 fieldname.len = fieldlen-2;
55 } else {
56 fieldlen = 0;
57 }
58
59 prefixlen = p-spat;
60 sublen = sdslen(ssub);
61 postfixlen = sdslen(spat)-(prefixlen+1)-fieldlen;
62 memcpy(keyname.buf,spat,prefixlen);
63 memcpy(keyname.buf+prefixlen,ssub,sublen);
64 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
65 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
66 keyname.len = prefixlen+sublen+postfixlen;
67 decrRefCount(subst);
68
69 /* Lookup substituted key */
70 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(struct sdshdr)));
71 o = lookupKeyRead(db,&keyobj);
72 if (o == NULL) return NULL;
73
74 if (fieldlen > 0) {
75 if (o->type != REDIS_HASH || fieldname.len < 1) return NULL;
76
77 /* Retrieve value from hash by the field name. This operation
78 * already increases the refcount of the returned object. */
79 initStaticStringObject(fieldobj,((char*)&fieldname)+(sizeof(struct sdshdr)));
80 o = hashTypeGetObject(o, &fieldobj);
81 } else {
82 if (o->type != REDIS_STRING) return NULL;
83
84 /* Every object that this function returns needs to have its refcount
85 * increased. sortCommand decreases it again. */
86 incrRefCount(o);
87 }
88
89 return o;
90 }
91
92 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
93 * the additional parameter is not standard but a BSD-specific we have to
94 * pass sorting parameters via the global 'server' structure */
95 int sortCompare(const void *s1, const void *s2) {
96 const redisSortObject *so1 = s1, *so2 = s2;
97 int cmp;
98
99 if (!server.sort_alpha) {
100 /* Numeric sorting. Here it's trivial as we precomputed scores */
101 if (so1->u.score > so2->u.score) {
102 cmp = 1;
103 } else if (so1->u.score < so2->u.score) {
104 cmp = -1;
105 } else {
106 /* Objects have the same score, but we don't want the comparison
107 * to be undefined, so we compare objects lexicographycally.
108 * This way the result of SORT is deterministic. */
109 cmp = compareStringObjects(so1->obj,so2->obj);
110 }
111 } else {
112 /* Alphanumeric sorting */
113 if (server.sort_bypattern) {
114 if (!so1->u.cmpobj || !so2->u.cmpobj) {
115 /* At least one compare object is NULL */
116 if (so1->u.cmpobj == so2->u.cmpobj)
117 cmp = 0;
118 else if (so1->u.cmpobj == NULL)
119 cmp = -1;
120 else
121 cmp = 1;
122 } else {
123 /* We have both the objects, use strcoll */
124 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
125 }
126 } else {
127 /* Compare elements directly. */
128 cmp = compareStringObjects(so1->obj,so2->obj);
129 }
130 }
131 return server.sort_desc ? -cmp : cmp;
132 }
133
134 /* The SORT command is the most complex command in Redis. Warning: this code
135 * is optimized for speed and a bit less for readability */
136 void sortCommand(redisClient *c) {
137 list *operations;
138 unsigned int outputlen = 0;
139 int desc = 0, alpha = 0;
140 long limit_start = 0, limit_count = -1, start, end;
141 int j, dontsort = 0, vectorlen;
142 int getop = 0; /* GET operation counter */
143 int int_convertion_error = 0;
144 robj *sortval, *sortby = NULL, *storekey = NULL;
145 redisSortObject *vector; /* Resulting vector to sort */
146
147 /* Lookup the key to sort. It must be of the right types */
148 sortval = lookupKeyRead(c->db,c->argv[1]);
149 if (sortval && sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
150 sortval->type != REDIS_ZSET)
151 {
152 addReply(c,shared.wrongtypeerr);
153 return;
154 }
155
156 /* Create a list of operations to perform for every sorted element.
157 * Operations can be GET/DEL/INCR/DECR */
158 operations = listCreate();
159 listSetFreeMethod(operations,zfree);
160 j = 2;
161
162 /* Now we need to protect sortval incrementing its count, in the future
163 * SORT may have options able to overwrite/delete keys during the sorting
164 * and the sorted key itself may get destroied */
165 if (sortval)
166 incrRefCount(sortval);
167 else
168 sortval = createListObject();
169
170 /* The SORT command has an SQL-alike syntax, parse it */
171 while(j < c->argc) {
172 int leftargs = c->argc-j-1;
173 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
174 desc = 0;
175 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
176 desc = 1;
177 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
178 alpha = 1;
179 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
180 if ((getLongFromObjectOrReply(c, c->argv[j+1], &limit_start, NULL) != REDIS_OK) ||
181 (getLongFromObjectOrReply(c, c->argv[j+2], &limit_count, NULL) != REDIS_OK)) return;
182 j+=2;
183 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
184 storekey = c->argv[j+1];
185 j++;
186 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
187 sortby = c->argv[j+1];
188 /* If the BY pattern does not contain '*', i.e. it is constant,
189 * we don't need to sort nor to lookup the weight keys. */
190 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
191 j++;
192 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
193 listAddNodeTail(operations,createSortOperation(
194 REDIS_SORT_GET,c->argv[j+1]));
195 getop++;
196 j++;
197 } else {
198 decrRefCount(sortval);
199 listRelease(operations);
200 addReply(c,shared.syntaxerr);
201 return;
202 }
203 j++;
204 }
205
206 /* Destructively convert encoded sorted sets for SORT. */
207 if (sortval->type == REDIS_ZSET)
208 zsetConvert(sortval, REDIS_ENCODING_SKIPLIST);
209
210 /* Load the sorting vector with all the objects to sort */
211 switch(sortval->type) {
212 case REDIS_LIST: vectorlen = listTypeLength(sortval); break;
213 case REDIS_SET: vectorlen = setTypeSize(sortval); break;
214 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
215 default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */
216 }
217 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
218 j = 0;
219
220 if (sortval->type == REDIS_LIST) {
221 listTypeIterator *li = listTypeInitIterator(sortval,0,REDIS_TAIL);
222 listTypeEntry entry;
223 while(listTypeNext(li,&entry)) {
224 vector[j].obj = listTypeGet(&entry);
225 vector[j].u.score = 0;
226 vector[j].u.cmpobj = NULL;
227 j++;
228 }
229 listTypeReleaseIterator(li);
230 } else if (sortval->type == REDIS_SET) {
231 setTypeIterator *si = setTypeInitIterator(sortval);
232 robj *ele;
233 while((ele = setTypeNextObject(si)) != NULL) {
234 vector[j].obj = ele;
235 vector[j].u.score = 0;
236 vector[j].u.cmpobj = NULL;
237 j++;
238 }
239 setTypeReleaseIterator(si);
240 } else if (sortval->type == REDIS_ZSET) {
241 dict *set = ((zset*)sortval->ptr)->dict;
242 dictIterator *di;
243 dictEntry *setele;
244 di = dictGetIterator(set);
245 while((setele = dictNext(di)) != NULL) {
246 vector[j].obj = dictGetKey(setele);
247 vector[j].u.score = 0;
248 vector[j].u.cmpobj = NULL;
249 j++;
250 }
251 dictReleaseIterator(di);
252 } else {
253 redisPanic("Unknown type");
254 }
255 redisAssertWithInfo(c,sortval,j == vectorlen);
256
257 /* Now it's time to load the right scores in the sorting vector */
258 if (dontsort == 0) {
259 for (j = 0; j < vectorlen; j++) {
260 robj *byval;
261 if (sortby) {
262 /* lookup value to sort by */
263 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
264 if (!byval) continue;
265 } else {
266 /* use object itself to sort by */
267 byval = vector[j].obj;
268 }
269
270 if (alpha) {
271 if (sortby) vector[j].u.cmpobj = getDecodedObject(byval);
272 } else {
273 if (byval->encoding == REDIS_ENCODING_RAW) {
274 char *eptr;
275
276 vector[j].u.score = strtod(byval->ptr,&eptr);
277 if (eptr[0] != '\0' || errno == ERANGE ||
278 isnan(vector[j].u.score))
279 {
280 int_convertion_error = 1;
281 }
282 } else if (byval->encoding == REDIS_ENCODING_INT) {
283 /* Don't need to decode the object if it's
284 * integer-encoded (the only encoding supported) so
285 * far. We can just cast it */
286 vector[j].u.score = (long)byval->ptr;
287 } else {
288 redisAssertWithInfo(c,sortval,1 != 1);
289 }
290 }
291
292 /* when the object was retrieved using lookupKeyByPattern,
293 * its refcount needs to be decreased. */
294 if (sortby) {
295 decrRefCount(byval);
296 }
297 }
298 }
299
300 /* We are ready to sort the vector... perform a bit of sanity check
301 * on the LIMIT option too. We'll use a partial version of quicksort. */
302 start = (limit_start < 0) ? 0 : limit_start;
303 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
304 if (start >= vectorlen) {
305 start = vectorlen-1;
306 end = vectorlen-2;
307 }
308 if (end >= vectorlen) end = vectorlen-1;
309
310 server.sort_dontsort = dontsort;
311 if (dontsort == 0) {
312 server.sort_desc = desc;
313 server.sort_alpha = alpha;
314 server.sort_bypattern = sortby ? 1 : 0;
315 if (sortby && (start != 0 || end != vectorlen-1))
316 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
317 else
318 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
319 }
320
321 /* Send command output to the output buffer, performing the specified
322 * GET/DEL/INCR/DECR operations if any. */
323 outputlen = getop ? getop*(end-start+1) : end-start+1;
324 if (int_convertion_error) {
325 addReplyError(c,"One or more scores can't be converted into double");
326 } else if (storekey == NULL) {
327 /* STORE option not specified, sent the sorting result to client */
328 addReplyMultiBulkLen(c,outputlen);
329 for (j = start; j <= end; j++) {
330 listNode *ln;
331 listIter li;
332
333 if (!getop) addReplyBulk(c,vector[j].obj);
334 listRewind(operations,&li);
335 while((ln = listNext(&li))) {
336 redisSortOperation *sop = ln->value;
337 robj *val = lookupKeyByPattern(c->db,sop->pattern,
338 vector[j].obj);
339
340 if (sop->type == REDIS_SORT_GET) {
341 if (!val) {
342 addReply(c,shared.nullbulk);
343 } else {
344 addReplyBulk(c,val);
345 decrRefCount(val);
346 }
347 } else {
348 /* Always fails */
349 redisAssertWithInfo(c,sortval,sop->type == REDIS_SORT_GET);
350 }
351 }
352 }
353 } else {
354 robj *sobj = createZiplistObject();
355
356 /* STORE option specified, set the sorting result as a List object */
357 for (j = start; j <= end; j++) {
358 listNode *ln;
359 listIter li;
360
361 if (!getop) {
362 listTypePush(sobj,vector[j].obj,REDIS_TAIL);
363 } else {
364 listRewind(operations,&li);
365 while((ln = listNext(&li))) {
366 redisSortOperation *sop = ln->value;
367 robj *val = lookupKeyByPattern(c->db,sop->pattern,
368 vector[j].obj);
369
370 if (sop->type == REDIS_SORT_GET) {
371 if (!val) val = createStringObject("",0);
372
373 /* listTypePush does an incrRefCount, so we should take care
374 * care of the incremented refcount caused by either
375 * lookupKeyByPattern or createStringObject("",0) */
376 listTypePush(sobj,val,REDIS_TAIL);
377 decrRefCount(val);
378 } else {
379 /* Always fails */
380 redisAssertWithInfo(c,sortval,sop->type == REDIS_SORT_GET);
381 }
382 }
383 }
384 }
385 if (outputlen) {
386 setKey(c->db,storekey,sobj);
387 server.dirty += outputlen;
388 } else if (dbDelete(c->db,storekey)) {
389 signalModifiedKey(c->db,storekey);
390 server.dirty++;
391 }
392 decrRefCount(sobj);
393 addReplyLongLong(c,outputlen);
394 }
395
396 /* Cleanup */
397 if (sortval->type == REDIS_LIST || sortval->type == REDIS_SET)
398 for (j = 0; j < vectorlen; j++)
399 decrRefCount(vector[j].obj);
400 decrRefCount(sortval);
401 listRelease(operations);
402 for (j = 0; j < vectorlen; j++) {
403 if (alpha && vector[j].u.cmpobj)
404 decrRefCount(vector[j].u.cmpobj);
405 }
406 zfree(vector);
407 }
408
409