#include "redis.h"
#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
+#include <math.h> /* isnan() */
redisSortOperation *createSortOperation(int type, robj *pattern) {
redisSortOperation *so = zmalloc(sizeof(*so));
} else if (so1->u.score < so2->u.score) {
cmp = -1;
} else {
- cmp = 0;
+ /* Objects have the same score, but we don't want the comparison
+ * to be undefined, so we compare objects lexicographycally.
+ * This way the result of SORT is deterministic. */
+ cmp = compareStringObjects(so1->obj,so2->obj);
}
} else {
/* Alphanumeric sorting */
list *operations;
unsigned int outputlen = 0;
int desc = 0, alpha = 0;
- int limit_start = 0, limit_count = -1, start, end;
+ long limit_start = 0, limit_count = -1, start, end;
int j, dontsort = 0, vectorlen;
int getop = 0; /* GET operation counter */
+ int int_convertion_error = 0;
robj *sortval, *sortby = NULL, *storekey = NULL;
redisSortObject *vector; /* Resulting vector to sort */
} else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
alpha = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
- limit_start = atoi(c->argv[j+1]->ptr);
- limit_count = atoi(c->argv[j+2]->ptr);
+ if ((getLongFromObjectOrReply(c, c->argv[j+1], &limit_start, NULL) != REDIS_OK) ||
+ (getLongFromObjectOrReply(c, c->argv[j+2], &limit_count, NULL) != REDIS_OK)) return;
j+=2;
} else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
storekey = c->argv[j+1];
j++;
}
+ /* If we have STORE we need to force sorting for deterministic output
+ * and replication. We use alpha sorting since this is guaranteed to
+ * work with any input. */
+ if (storekey && dontsort) {
+ dontsort = 0;
+ alpha = 1;
+ sortby = NULL;
+ }
+
/* Destructively convert encoded sorted sets for SORT. */
if (sortval->type == REDIS_ZSET)
zsetConvert(sortval, REDIS_ENCODING_SKIPLIST);
if (sortby) vector[j].u.cmpobj = getDecodedObject(byval);
} else {
if (byval->encoding == REDIS_ENCODING_RAW) {
- vector[j].u.score = strtod(byval->ptr,NULL);
+ char *eptr;
+
+ vector[j].u.score = strtod(byval->ptr,&eptr);
+ if (eptr[0] != '\0' || errno == ERANGE ||
+ isnan(vector[j].u.score))
+ {
+ int_convertion_error = 1;
+ }
} else if (byval->encoding == REDIS_ENCODING_INT) {
/* Don't need to decode the object if it's
* integer-encoded (the only encoding supported) so
}
if (end >= vectorlen) end = vectorlen-1;
+ server.sort_dontsort = dontsort;
if (dontsort == 0) {
server.sort_desc = desc;
server.sort_alpha = alpha;
/* Send command output to the output buffer, performing the specified
* GET/DEL/INCR/DECR operations if any. */
outputlen = getop ? getop*(end-start+1) : end-start+1;
- if (storekey == NULL) {
+ if (int_convertion_error) {
+ addReplyError(c,"One or more scores can't be converted into double");
+ } else if (storekey == NULL) {
/* STORE option not specified, sent the sorting result to client */
addReplyMultiBulkLen(c,outputlen);
for (j = start; j <= end; j++) {
}
}
}
- if (outputlen) setKey(c->db,storekey,sobj);
+ if (outputlen) {
+ setKey(c->db,storekey,sobj);
+ server.dirty += outputlen;
+ } else if (dbDelete(c->db,storekey)) {
+ signalModifiedKey(c->db,storekey);
+ server.dirty++;
+ }
decrRefCount(sobj);
- server.dirty += outputlen;
addReplyLongLong(c,outputlen);
}