]> git.saurik.com Git - redis.git/blame_incremental - src/sort.c
Tests for MONITOR.
[redis.git] / src / sort.c
... / ...
CommitLineData
1#include "redis.h"
2#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
3#include <math.h> /* isnan() */
4
5redisSortOperation *createSortOperation(int type, robj *pattern) {
6 redisSortOperation *so = zmalloc(sizeof(*so));
7 so->type = type;
8 so->pattern = pattern;
9 return so;
10}
11
12/* Return the value associated to the key with a name obtained
13 * substituting the first occurence of '*' in 'pattern' with 'subst'.
14 * The returned object will always have its refcount increased by 1
15 * when it is non-NULL. */
16robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
17 char *p, *f;
18 sds spat, ssub;
19 robj keyobj, fieldobj, *o;
20 int prefixlen, sublen, postfixlen, fieldlen;
21 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
22 struct {
23 int len;
24 int free;
25 char buf[REDIS_SORTKEY_MAX+1];
26 } keyname, fieldname;
27
28 /* If the pattern is "#" return the substitution object itself in order
29 * to implement the "SORT ... GET #" feature. */
30 spat = pattern->ptr;
31 if (spat[0] == '#' && spat[1] == '\0') {
32 incrRefCount(subst);
33 return subst;
34 }
35
36 /* The substitution object may be specially encoded. If so we create
37 * a decoded object on the fly. Otherwise getDecodedObject will just
38 * increment the ref count, that we'll decrement later. */
39 subst = getDecodedObject(subst);
40
41 ssub = subst->ptr;
42 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
43 p = strchr(spat,'*');
44 if (!p) {
45 decrRefCount(subst);
46 return NULL;
47 }
48
49 /* Find out if we're dealing with a hash dereference. */
50 if ((f = strstr(p+1, "->")) != NULL) {
51 fieldlen = sdslen(spat)-(f-spat);
52 /* this also copies \0 character */
53 memcpy(fieldname.buf,f+2,fieldlen-1);
54 fieldname.len = fieldlen-2;
55 } else {
56 fieldlen = 0;
57 }
58
59 prefixlen = p-spat;
60 sublen = sdslen(ssub);
61 postfixlen = sdslen(spat)-(prefixlen+1)-fieldlen;
62 memcpy(keyname.buf,spat,prefixlen);
63 memcpy(keyname.buf+prefixlen,ssub,sublen);
64 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
65 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
66 keyname.len = prefixlen+sublen+postfixlen;
67 decrRefCount(subst);
68
69 /* Lookup substituted key */
70 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(struct sdshdr)));
71 o = lookupKeyRead(db,&keyobj);
72 if (o == NULL) return NULL;
73
74 if (fieldlen > 0) {
75 if (o->type != REDIS_HASH || fieldname.len < 1) return NULL;
76
77 /* Retrieve value from hash by the field name. This operation
78 * already increases the refcount of the returned object. */
79 initStaticStringObject(fieldobj,((char*)&fieldname)+(sizeof(struct sdshdr)));
80 o = hashTypeGetObject(o, &fieldobj);
81 } else {
82 if (o->type != REDIS_STRING) return NULL;
83
84 /* Every object that this function returns needs to have its refcount
85 * increased. sortCommand decreases it again. */
86 incrRefCount(o);
87 }
88
89 return o;
90}
91
92/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
93 * the additional parameter is not standard but a BSD-specific we have to
94 * pass sorting parameters via the global 'server' structure */
95int sortCompare(const void *s1, const void *s2) {
96 const redisSortObject *so1 = s1, *so2 = s2;
97 int cmp;
98
99 if (!server.sort_alpha) {
100 /* Numeric sorting. Here it's trivial as we precomputed scores */
101 if (so1->u.score > so2->u.score) {
102 cmp = 1;
103 } else if (so1->u.score < so2->u.score) {
104 cmp = -1;
105 } else {
106 /* Objects have the same score, but we don't want the comparison
107 * to be undefined, so we compare objects lexicographycally.
108 * This way the result of SORT is deterministic. */
109 cmp = compareStringObjects(so1->obj,so2->obj);
110 }
111 } else {
112 /* Alphanumeric sorting */
113 if (server.sort_bypattern) {
114 if (!so1->u.cmpobj || !so2->u.cmpobj) {
115 /* At least one compare object is NULL */
116 if (so1->u.cmpobj == so2->u.cmpobj)
117 cmp = 0;
118 else if (so1->u.cmpobj == NULL)
119 cmp = -1;
120 else
121 cmp = 1;
122 } else {
123 /* We have both the objects, use strcoll */
124 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
125 }
126 } else {
127 /* Compare elements directly. */
128 cmp = compareStringObjects(so1->obj,so2->obj);
129 }
130 }
131 return server.sort_desc ? -cmp : cmp;
132}
133
134/* The SORT command is the most complex command in Redis. Warning: this code
135 * is optimized for speed and a bit less for readability */
136void sortCommand(redisClient *c) {
137 list *operations;
138 unsigned int outputlen = 0;
139 int desc = 0, alpha = 0;
140 long limit_start = 0, limit_count = -1, start, end;
141 int j, dontsort = 0, vectorlen;
142 int getop = 0; /* GET operation counter */
143 int int_convertion_error = 0;
144 robj *sortval, *sortby = NULL, *storekey = NULL;
145 redisSortObject *vector; /* Resulting vector to sort */
146
147 /* Lookup the key to sort. It must be of the right types */
148 sortval = lookupKeyRead(c->db,c->argv[1]);
149 if (sortval && sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
150 sortval->type != REDIS_ZSET)
151 {
152 addReply(c,shared.wrongtypeerr);
153 return;
154 }
155
156 /* Create a list of operations to perform for every sorted element.
157 * Operations can be GET/DEL/INCR/DECR */
158 operations = listCreate();
159 listSetFreeMethod(operations,zfree);
160 j = 2;
161
162 /* Now we need to protect sortval incrementing its count, in the future
163 * SORT may have options able to overwrite/delete keys during the sorting
164 * and the sorted key itself may get destroied */
165 if (sortval)
166 incrRefCount(sortval);
167 else
168 sortval = createListObject();
169
170 /* The SORT command has an SQL-alike syntax, parse it */
171 while(j < c->argc) {
172 int leftargs = c->argc-j-1;
173 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
174 desc = 0;
175 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
176 desc = 1;
177 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
178 alpha = 1;
179 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
180 if ((getLongFromObjectOrReply(c, c->argv[j+1], &limit_start, NULL) != REDIS_OK) ||
181 (getLongFromObjectOrReply(c, c->argv[j+2], &limit_count, NULL) != REDIS_OK)) return;
182 j+=2;
183 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
184 storekey = c->argv[j+1];
185 j++;
186 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
187 sortby = c->argv[j+1];
188 /* If the BY pattern does not contain '*', i.e. it is constant,
189 * we don't need to sort nor to lookup the weight keys. */
190 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
191 j++;
192 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
193 listAddNodeTail(operations,createSortOperation(
194 REDIS_SORT_GET,c->argv[j+1]));
195 getop++;
196 j++;
197 } else {
198 decrRefCount(sortval);
199 listRelease(operations);
200 addReply(c,shared.syntaxerr);
201 return;
202 }
203 j++;
204 }
205
206 /* If we have STORE we need to force sorting for deterministic output
207 * and replication. We use alpha sorting since this is guaranteed to
208 * work with any input. */
209 if (storekey && dontsort) {
210 dontsort = 0;
211 alpha = 1;
212 sortby = NULL;
213 }
214
215 /* Destructively convert encoded sorted sets for SORT. */
216 if (sortval->type == REDIS_ZSET)
217 zsetConvert(sortval, REDIS_ENCODING_SKIPLIST);
218
219 /* Load the sorting vector with all the objects to sort */
220 switch(sortval->type) {
221 case REDIS_LIST: vectorlen = listTypeLength(sortval); break;
222 case REDIS_SET: vectorlen = setTypeSize(sortval); break;
223 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
224 default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */
225 }
226 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
227 j = 0;
228
229 if (sortval->type == REDIS_LIST) {
230 listTypeIterator *li = listTypeInitIterator(sortval,0,REDIS_TAIL);
231 listTypeEntry entry;
232 while(listTypeNext(li,&entry)) {
233 vector[j].obj = listTypeGet(&entry);
234 vector[j].u.score = 0;
235 vector[j].u.cmpobj = NULL;
236 j++;
237 }
238 listTypeReleaseIterator(li);
239 } else if (sortval->type == REDIS_SET) {
240 setTypeIterator *si = setTypeInitIterator(sortval);
241 robj *ele;
242 while((ele = setTypeNextObject(si)) != NULL) {
243 vector[j].obj = ele;
244 vector[j].u.score = 0;
245 vector[j].u.cmpobj = NULL;
246 j++;
247 }
248 setTypeReleaseIterator(si);
249 } else if (sortval->type == REDIS_ZSET) {
250 dict *set = ((zset*)sortval->ptr)->dict;
251 dictIterator *di;
252 dictEntry *setele;
253 di = dictGetIterator(set);
254 while((setele = dictNext(di)) != NULL) {
255 vector[j].obj = dictGetKey(setele);
256 vector[j].u.score = 0;
257 vector[j].u.cmpobj = NULL;
258 j++;
259 }
260 dictReleaseIterator(di);
261 } else {
262 redisPanic("Unknown type");
263 }
264 redisAssertWithInfo(c,sortval,j == vectorlen);
265
266 /* Now it's time to load the right scores in the sorting vector */
267 if (dontsort == 0) {
268 for (j = 0; j < vectorlen; j++) {
269 robj *byval;
270 if (sortby) {
271 /* lookup value to sort by */
272 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
273 if (!byval) continue;
274 } else {
275 /* use object itself to sort by */
276 byval = vector[j].obj;
277 }
278
279 if (alpha) {
280 if (sortby) vector[j].u.cmpobj = getDecodedObject(byval);
281 } else {
282 if (byval->encoding == REDIS_ENCODING_RAW) {
283 char *eptr;
284
285 vector[j].u.score = strtod(byval->ptr,&eptr);
286 if (eptr[0] != '\0' || errno == ERANGE ||
287 isnan(vector[j].u.score))
288 {
289 int_convertion_error = 1;
290 }
291 } else if (byval->encoding == REDIS_ENCODING_INT) {
292 /* Don't need to decode the object if it's
293 * integer-encoded (the only encoding supported) so
294 * far. We can just cast it */
295 vector[j].u.score = (long)byval->ptr;
296 } else {
297 redisAssertWithInfo(c,sortval,1 != 1);
298 }
299 }
300
301 /* when the object was retrieved using lookupKeyByPattern,
302 * its refcount needs to be decreased. */
303 if (sortby) {
304 decrRefCount(byval);
305 }
306 }
307 }
308
309 /* We are ready to sort the vector... perform a bit of sanity check
310 * on the LIMIT option too. We'll use a partial version of quicksort. */
311 start = (limit_start < 0) ? 0 : limit_start;
312 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
313 if (start >= vectorlen) {
314 start = vectorlen-1;
315 end = vectorlen-2;
316 }
317 if (end >= vectorlen) end = vectorlen-1;
318
319 server.sort_dontsort = dontsort;
320 if (dontsort == 0) {
321 server.sort_desc = desc;
322 server.sort_alpha = alpha;
323 server.sort_bypattern = sortby ? 1 : 0;
324 if (sortby && (start != 0 || end != vectorlen-1))
325 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
326 else
327 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
328 }
329
330 /* Send command output to the output buffer, performing the specified
331 * GET/DEL/INCR/DECR operations if any. */
332 outputlen = getop ? getop*(end-start+1) : end-start+1;
333 if (int_convertion_error) {
334 addReplyError(c,"One or more scores can't be converted into double");
335 } else if (storekey == NULL) {
336 /* STORE option not specified, sent the sorting result to client */
337 addReplyMultiBulkLen(c,outputlen);
338 for (j = start; j <= end; j++) {
339 listNode *ln;
340 listIter li;
341
342 if (!getop) addReplyBulk(c,vector[j].obj);
343 listRewind(operations,&li);
344 while((ln = listNext(&li))) {
345 redisSortOperation *sop = ln->value;
346 robj *val = lookupKeyByPattern(c->db,sop->pattern,
347 vector[j].obj);
348
349 if (sop->type == REDIS_SORT_GET) {
350 if (!val) {
351 addReply(c,shared.nullbulk);
352 } else {
353 addReplyBulk(c,val);
354 decrRefCount(val);
355 }
356 } else {
357 /* Always fails */
358 redisAssertWithInfo(c,sortval,sop->type == REDIS_SORT_GET);
359 }
360 }
361 }
362 } else {
363 robj *sobj = createZiplistObject();
364
365 /* STORE option specified, set the sorting result as a List object */
366 for (j = start; j <= end; j++) {
367 listNode *ln;
368 listIter li;
369
370 if (!getop) {
371 listTypePush(sobj,vector[j].obj,REDIS_TAIL);
372 } else {
373 listRewind(operations,&li);
374 while((ln = listNext(&li))) {
375 redisSortOperation *sop = ln->value;
376 robj *val = lookupKeyByPattern(c->db,sop->pattern,
377 vector[j].obj);
378
379 if (sop->type == REDIS_SORT_GET) {
380 if (!val) val = createStringObject("",0);
381
382 /* listTypePush does an incrRefCount, so we should take care
383 * care of the incremented refcount caused by either
384 * lookupKeyByPattern or createStringObject("",0) */
385 listTypePush(sobj,val,REDIS_TAIL);
386 decrRefCount(val);
387 } else {
388 /* Always fails */
389 redisAssertWithInfo(c,sortval,sop->type == REDIS_SORT_GET);
390 }
391 }
392 }
393 }
394 if (outputlen) {
395 setKey(c->db,storekey,sobj);
396 server.dirty += outputlen;
397 } else if (dbDelete(c->db,storekey)) {
398 signalModifiedKey(c->db,storekey);
399 server.dirty++;
400 }
401 decrRefCount(sobj);
402 addReplyLongLong(c,outputlen);
403 }
404
405 /* Cleanup */
406 if (sortval->type == REDIS_LIST || sortval->type == REDIS_SET)
407 for (j = 0; j < vectorlen; j++)
408 decrRefCount(vector[j].obj);
409 decrRefCount(sortval);
410 listRelease(operations);
411 for (j = 0; j < vectorlen; j++) {
412 if (alpha && vector[j].u.cmpobj)
413 decrRefCount(vector[j].u.cmpobj);
414 }
415 zfree(vector);
416}
417
418