]>
git.saurik.com Git - redis.git/blob - src/sort.c
3f02e49a709a4cbf32c6d4ba92480de55d218f05
   2 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */ 
   3 #include <math.h> /* isnan() */ 
   5 redisSortOperation 
*createSortOperation(int type
, robj 
*pattern
) { 
   6     redisSortOperation 
*so 
= zmalloc(sizeof(*so
)); 
  12 /* Return the value associated to the key with a name obtained 
  13  * substituting the first occurence of '*' in 'pattern' with 'subst'. 
  14  * The returned object will always have its refcount increased by 1 
  15  * when it is non-NULL. */ 
  16 robj 
*lookupKeyByPattern(redisDb 
*db
, robj 
*pattern
, robj 
*subst
) { 
  19     robj keyobj
, fieldobj
, *o
; 
  20     int prefixlen
, sublen
, postfixlen
, fieldlen
; 
  21     /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */ 
  25         char buf
[REDIS_SORTKEY_MAX
+1]; 
  28     /* If the pattern is "#" return the substitution object itself in order 
  29      * to implement the "SORT ... GET #" feature. */ 
  31     if (spat
[0] == '#' && spat
[1] == '\0') { 
  36     /* The substitution object may be specially encoded. If so we create 
  37      * a decoded object on the fly. Otherwise getDecodedObject will just 
  38      * increment the ref count, that we'll decrement later. */ 
  39     subst 
= getDecodedObject(subst
); 
  42     if (sdslen(spat
)+sdslen(ssub
)-1 > REDIS_SORTKEY_MAX
) return NULL
; 
  49     /* Find out if we're dealing with a hash dereference. */ 
  50     if ((f 
= strstr(p
+1, "->")) != NULL
) { 
  51         fieldlen 
= sdslen(spat
)-(f
-spat
); 
  52         /* this also copies \0 character */ 
  53         memcpy(fieldname
.buf
,f
+2,fieldlen
-1); 
  54         fieldname
.len 
= fieldlen
-2; 
  60     sublen 
= sdslen(ssub
); 
  61     postfixlen 
= sdslen(spat
)-(prefixlen
+1)-fieldlen
; 
  62     memcpy(keyname
.buf
,spat
,prefixlen
); 
  63     memcpy(keyname
.buf
+prefixlen
,ssub
,sublen
); 
  64     memcpy(keyname
.buf
+prefixlen
+sublen
,p
+1,postfixlen
); 
  65     keyname
.buf
[prefixlen
+sublen
+postfixlen
] = '\0'; 
  66     keyname
.len 
= prefixlen
+sublen
+postfixlen
; 
  69     /* Lookup substituted key */ 
  70     initStaticStringObject(keyobj
,((char*)&keyname
)+(sizeof(struct sdshdr
))); 
  71     o 
= lookupKeyRead(db
,&keyobj
); 
  72     if (o 
== NULL
) return NULL
; 
  75         if (o
->type 
!= REDIS_HASH 
|| fieldname
.len 
< 1) return NULL
; 
  77         /* Retrieve value from hash by the field name. This operation 
  78          * already increases the refcount of the returned object. */ 
  79         initStaticStringObject(fieldobj
,((char*)&fieldname
)+(sizeof(struct sdshdr
))); 
  80         o 
= hashTypeGetObject(o
, &fieldobj
); 
  82         if (o
->type 
!= REDIS_STRING
) return NULL
; 
  84         /* Every object that this function returns needs to have its refcount 
  85          * increased. sortCommand decreases it again. */ 
  92 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with 
  93  * the additional parameter is not standard but a BSD-specific we have to 
  94  * pass sorting parameters via the global 'server' structure */ 
  95 int sortCompare(const void *s1
, const void *s2
) { 
  96     const redisSortObject 
*so1 
= s1
, *so2 
= s2
; 
  99     if (!server
.sort_alpha
) { 
 100         /* Numeric sorting. Here it's trivial as we precomputed scores */ 
 101         if (so1
->u
.score 
> so2
->u
.score
) { 
 103         } else if (so1
->u
.score 
< so2
->u
.score
) { 
 106             /* Objects have the same score, but we don't want the comparison 
 107              * to be undefined, so we compare objects lexicographycally. 
 108              * This way the result of SORT is deterministic. */ 
 109             cmp 
= compareStringObjects(so1
->obj
,so2
->obj
); 
 112         /* Alphanumeric sorting */ 
 113         if (server
.sort_bypattern
) { 
 114             if (!so1
->u
.cmpobj 
|| !so2
->u
.cmpobj
) { 
 115                 /* At least one compare object is NULL */ 
 116                 if (so1
->u
.cmpobj 
== so2
->u
.cmpobj
) 
 118                 else if (so1
->u
.cmpobj 
== NULL
) 
 123                 /* We have both the objects, use strcoll */ 
 124                 cmp 
= strcoll(so1
->u
.cmpobj
->ptr
,so2
->u
.cmpobj
->ptr
); 
 127             /* Compare elements directly. */ 
 128             cmp 
= compareStringObjects(so1
->obj
,so2
->obj
); 
 131     return server
.sort_desc 
? -cmp 
: cmp
; 
 134 /* The SORT command is the most complex command in Redis. Warning: this code 
 135  * is optimized for speed and a bit less for readability */ 
 136 void sortCommand(redisClient 
*c
) { 
 138     unsigned int outputlen 
= 0; 
 139     int desc 
= 0, alpha 
= 0; 
 140     long limit_start 
= 0, limit_count 
= -1, start
, end
; 
 141     int j
, dontsort 
= 0, vectorlen
; 
 142     int getop 
= 0; /* GET operation counter */ 
 143     int int_convertion_error 
= 0; 
 144     robj 
*sortval
, *sortby 
= NULL
, *storekey 
= NULL
; 
 145     redisSortObject 
*vector
; /* Resulting vector to sort */ 
 147     /* Lookup the key to sort. It must be of the right types */ 
 148     sortval 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
 149     if (sortval 
&& sortval
->type 
!= REDIS_SET 
&& sortval
->type 
!= REDIS_LIST 
&& 
 150         sortval
->type 
!= REDIS_ZSET
) 
 152         addReply(c
,shared
.wrongtypeerr
); 
 156     /* Create a list of operations to perform for every sorted element. 
 157      * Operations can be GET/DEL/INCR/DECR */ 
 158     operations 
= listCreate(); 
 159     listSetFreeMethod(operations
,zfree
); 
 162     /* Now we need to protect sortval incrementing its count, in the future 
 163      * SORT may have options able to overwrite/delete keys during the sorting 
 164      * and the sorted key itself may get destroied */ 
 166         incrRefCount(sortval
); 
 168         sortval 
= createListObject(); 
 170     /* The SORT command has an SQL-alike syntax, parse it */ 
 172         int leftargs 
= c
->argc
-j
-1; 
 173         if (!strcasecmp(c
->argv
[j
]->ptr
,"asc")) { 
 175         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"desc")) { 
 177         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"alpha")) { 
 179         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"limit") && leftargs 
>= 2) { 
 180             if ((getLongFromObjectOrReply(c
, c
->argv
[j
+1], &limit_start
, NULL
) != REDIS_OK
) || 
 181                 (getLongFromObjectOrReply(c
, c
->argv
[j
+2], &limit_count
, NULL
) != REDIS_OK
)) return; 
 183         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"store") && leftargs 
>= 1) { 
 184             storekey 
= c
->argv
[j
+1]; 
 186         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"by") && leftargs 
>= 1) { 
 187             sortby 
= c
->argv
[j
+1]; 
 188             /* If the BY pattern does not contain '*', i.e. it is constant, 
 189              * we don't need to sort nor to lookup the weight keys. */ 
 190             if (strchr(c
->argv
[j
+1]->ptr
,'*') == NULL
) dontsort 
= 1; 
 192         } else if (!strcasecmp(c
->argv
[j
]->ptr
,"get") && leftargs 
>= 1) { 
 193             listAddNodeTail(operations
,createSortOperation( 
 194                 REDIS_SORT_GET
,c
->argv
[j
+1])); 
 198             decrRefCount(sortval
); 
 199             listRelease(operations
); 
 200             addReply(c
,shared
.syntaxerr
); 
 206     /* If we have STORE we need to force sorting for deterministic output 
 207      * and replication. We use alpha sorting since this is guaranteed to 
 208      * work with any input. */ 
 209     if (storekey 
&& dontsort
) { 
 215     /* Destructively convert encoded sorted sets for SORT. */ 
 216     if (sortval
->type 
== REDIS_ZSET
) 
 217         zsetConvert(sortval
, REDIS_ENCODING_SKIPLIST
); 
 219     /* Load the sorting vector with all the objects to sort */ 
 220     switch(sortval
->type
) { 
 221     case REDIS_LIST
: vectorlen 
= listTypeLength(sortval
); break; 
 222     case REDIS_SET
: vectorlen 
=  setTypeSize(sortval
); break; 
 223     case REDIS_ZSET
: vectorlen 
= dictSize(((zset
*)sortval
->ptr
)->dict
); break; 
 224     default: vectorlen 
= 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */ 
 226     vector 
= zmalloc(sizeof(redisSortObject
)*vectorlen
); 
 229     if (sortval
->type 
== REDIS_LIST
) { 
 230         listTypeIterator 
*li 
= listTypeInitIterator(sortval
,0,REDIS_TAIL
); 
 232         while(listTypeNext(li
,&entry
)) { 
 233             vector
[j
].obj 
= listTypeGet(&entry
); 
 234             vector
[j
].u
.score 
= 0; 
 235             vector
[j
].u
.cmpobj 
= NULL
; 
 238         listTypeReleaseIterator(li
); 
 239     } else if (sortval
->type 
== REDIS_SET
) { 
 240         setTypeIterator 
*si 
= setTypeInitIterator(sortval
); 
 242         while((ele 
= setTypeNextObject(si
)) != NULL
) { 
 244             vector
[j
].u
.score 
= 0; 
 245             vector
[j
].u
.cmpobj 
= NULL
; 
 248         setTypeReleaseIterator(si
); 
 249     } else if (sortval
->type 
== REDIS_ZSET
) { 
 250         dict 
*set 
= ((zset
*)sortval
->ptr
)->dict
; 
 253         di 
= dictGetIterator(set
); 
 254         while((setele 
= dictNext(di
)) != NULL
) { 
 255             vector
[j
].obj 
= dictGetKey(setele
); 
 256             vector
[j
].u
.score 
= 0; 
 257             vector
[j
].u
.cmpobj 
= NULL
; 
 260         dictReleaseIterator(di
); 
 262         redisPanic("Unknown type"); 
 264     redisAssertWithInfo(c
,sortval
,j 
== vectorlen
); 
 266     /* Now it's time to load the right scores in the sorting vector */ 
 268         for (j 
= 0; j 
< vectorlen
; j
++) { 
 271                 /* lookup value to sort by */ 
 272                 byval 
= lookupKeyByPattern(c
->db
,sortby
,vector
[j
].obj
); 
 273                 if (!byval
) continue; 
 275                 /* use object itself to sort by */ 
 276                 byval 
= vector
[j
].obj
; 
 280                 if (sortby
) vector
[j
].u
.cmpobj 
= getDecodedObject(byval
); 
 282                 if (byval
->encoding 
== REDIS_ENCODING_RAW
) { 
 285                     vector
[j
].u
.score 
= strtod(byval
->ptr
,&eptr
); 
 286                     if (eptr
[0] != '\0' || errno 
== ERANGE 
|| 
 287                         isnan(vector
[j
].u
.score
)) 
 289                         int_convertion_error 
= 1; 
 291                 } else if (byval
->encoding 
== REDIS_ENCODING_INT
) { 
 292                     /* Don't need to decode the object if it's 
 293                      * integer-encoded (the only encoding supported) so 
 294                      * far. We can just cast it */ 
 295                     vector
[j
].u
.score 
= (long)byval
->ptr
; 
 297                     redisAssertWithInfo(c
,sortval
,1 != 1); 
 301             /* when the object was retrieved using lookupKeyByPattern, 
 302              * its refcount needs to be decreased. */ 
 309     /* We are ready to sort the vector... perform a bit of sanity check 
 310      * on the LIMIT option too. We'll use a partial version of quicksort. */ 
 311     start 
= (limit_start 
< 0) ? 0 : limit_start
; 
 312     end 
= (limit_count 
< 0) ? vectorlen
-1 : start
+limit_count
-1; 
 313     if (start 
>= vectorlen
) { 
 317     if (end 
>= vectorlen
) end 
= vectorlen
-1; 
 319     server
.sort_dontsort 
= dontsort
; 
 321         server
.sort_desc 
= desc
; 
 322         server
.sort_alpha 
= alpha
; 
 323         server
.sort_bypattern 
= sortby 
? 1 : 0; 
 324         if (sortby 
&& (start 
!= 0 || end 
!= vectorlen
-1)) 
 325             pqsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
, start
,end
); 
 327             qsort(vector
,vectorlen
,sizeof(redisSortObject
),sortCompare
); 
 330     /* Send command output to the output buffer, performing the specified 
 331      * GET/DEL/INCR/DECR operations if any. */ 
 332     outputlen 
= getop 
? getop
*(end
-start
+1) : end
-start
+1; 
 333     if (int_convertion_error
) { 
 334         addReplyError(c
,"One or more scores can't be converted into double"); 
 335     } else if (storekey 
== NULL
) { 
 336         /* STORE option not specified, sent the sorting result to client */ 
 337         addReplyMultiBulkLen(c
,outputlen
); 
 338         for (j 
= start
; j 
<= end
; j
++) { 
 342             if (!getop
) addReplyBulk(c
,vector
[j
].obj
); 
 343             listRewind(operations
,&li
); 
 344             while((ln 
= listNext(&li
))) { 
 345                 redisSortOperation 
*sop 
= ln
->value
; 
 346                 robj 
*val 
= lookupKeyByPattern(c
->db
,sop
->pattern
, 
 349                 if (sop
->type 
== REDIS_SORT_GET
) { 
 351                         addReply(c
,shared
.nullbulk
); 
 358                     redisAssertWithInfo(c
,sortval
,sop
->type 
== REDIS_SORT_GET
); 
 363         robj 
*sobj 
= createZiplistObject(); 
 365         /* STORE option specified, set the sorting result as a List object */ 
 366         for (j 
= start
; j 
<= end
; j
++) { 
 371                 listTypePush(sobj
,vector
[j
].obj
,REDIS_TAIL
); 
 373                 listRewind(operations
,&li
); 
 374                 while((ln 
= listNext(&li
))) { 
 375                     redisSortOperation 
*sop 
= ln
->value
; 
 376                     robj 
*val 
= lookupKeyByPattern(c
->db
,sop
->pattern
, 
 379                     if (sop
->type 
== REDIS_SORT_GET
) { 
 380                         if (!val
) val 
= createStringObject("",0); 
 382                         /* listTypePush does an incrRefCount, so we should take care 
 383                          * care of the incremented refcount caused by either 
 384                          * lookupKeyByPattern or createStringObject("",0) */ 
 385                         listTypePush(sobj
,val
,REDIS_TAIL
); 
 389                         redisAssertWithInfo(c
,sortval
,sop
->type 
== REDIS_SORT_GET
); 
 395             setKey(c
->db
,storekey
,sobj
); 
 396             server
.dirty 
+= outputlen
; 
 397         } else if (dbDelete(c
->db
,storekey
)) { 
 398             signalModifiedKey(c
->db
,storekey
); 
 402         addReplyLongLong(c
,outputlen
); 
 406     if (sortval
->type 
== REDIS_LIST 
|| sortval
->type 
== REDIS_SET
) 
 407         for (j 
= 0; j 
< vectorlen
; j
++) 
 408             decrRefCount(vector
[j
].obj
); 
 409     decrRefCount(sortval
); 
 410     listRelease(operations
); 
 411     for (j 
= 0; j 
< vectorlen
; j
++) { 
 412         if (alpha 
&& vector
[j
].u
.cmpobj
) 
 413             decrRefCount(vector
[j
].u
.cmpobj
);