3 /*----------------------------------------------------------------------------- 
   5  *----------------------------------------------------------------------------*/ 
   7 /* Factory method to return a set that *can* hold "value". When the object has 
   8  * an integer-encodable value, an intset will be returned. Otherwise a regular 
  10 robj 
*setTypeCreate(robj 
*value
) { 
  11     if (isObjectRepresentableAsLongLong(value
,NULL
) == REDIS_OK
) 
  12         return createIntsetObject(); 
  13     return createSetObject(); 
  16 int setTypeAdd(robj 
*subject
, robj 
*value
) { 
  18     if (subject
->encoding 
== REDIS_ENCODING_HT
) { 
  19         if (dictAdd(subject
->ptr
,value
,NULL
) == DICT_OK
) { 
  23     } else if (subject
->encoding 
== REDIS_ENCODING_INTSET
) { 
  24         if (isObjectRepresentableAsLongLong(value
,&llval
) == REDIS_OK
) { 
  26             subject
->ptr 
= intsetAdd(subject
->ptr
,llval
,&success
); 
  28                 /* Convert to regular set when the intset contains 
  29                  * too many entries. */ 
  30                 if (intsetLen(subject
->ptr
) > server
.set_max_intset_entries
) 
  31                     setTypeConvert(subject
,REDIS_ENCODING_HT
); 
  35             /* Failed to get integer from object, convert to regular set. */ 
  36             setTypeConvert(subject
,REDIS_ENCODING_HT
); 
  38             /* The set *was* an intset and this value is not integer 
  39              * encodable, so dictAdd should always work. */ 
  40             redisAssertWithInfo(NULL
,value
,dictAdd(subject
->ptr
,value
,NULL
) == DICT_OK
); 
  45         redisPanic("Unknown set encoding"); 
  50 int setTypeRemove(robj 
*setobj
, robj 
*value
) { 
  52     if (setobj
->encoding 
== REDIS_ENCODING_HT
) { 
  53         if (dictDelete(setobj
->ptr
,value
) == DICT_OK
) { 
  54             if (htNeedsResize(setobj
->ptr
)) dictResize(setobj
->ptr
); 
  57     } else if (setobj
->encoding 
== REDIS_ENCODING_INTSET
) { 
  58         if (isObjectRepresentableAsLongLong(value
,&llval
) == REDIS_OK
) { 
  60             setobj
->ptr 
= intsetRemove(setobj
->ptr
,llval
,&success
); 
  61             if (success
) return 1; 
  64         redisPanic("Unknown set encoding"); 
  69 int setTypeIsMember(robj 
*subject
, robj 
*value
) { 
  71     if (subject
->encoding 
== REDIS_ENCODING_HT
) { 
  72         return dictFind((dict
*)subject
->ptr
,value
) != NULL
; 
  73     } else if (subject
->encoding 
== REDIS_ENCODING_INTSET
) { 
  74         if (isObjectRepresentableAsLongLong(value
,&llval
) == REDIS_OK
) { 
  75             return intsetFind((intset
*)subject
->ptr
,llval
); 
  78         redisPanic("Unknown set encoding"); 
  83 setTypeIterator 
*setTypeInitIterator(robj 
*subject
) { 
  84     setTypeIterator 
*si 
= zmalloc(sizeof(setTypeIterator
)); 
  85     si
->subject 
= subject
; 
  86     si
->encoding 
= subject
->encoding
; 
  87     if (si
->encoding 
== REDIS_ENCODING_HT
) { 
  88         si
->di 
= dictGetIterator(subject
->ptr
); 
  89     } else if (si
->encoding 
== REDIS_ENCODING_INTSET
) { 
  92         redisPanic("Unknown set encoding"); 
  97 void setTypeReleaseIterator(setTypeIterator 
*si
) { 
  98     if (si
->encoding 
== REDIS_ENCODING_HT
) 
  99         dictReleaseIterator(si
->di
); 
 103 /* Move to the next entry in the set. Returns the object at the current 
 106  * Since set elements can be internally be stored as redis objects or 
 107  * simple arrays of integers, setTypeNext returns the encoding of the 
 108  * set object you are iterating, and will populate the appropriate pointer 
 109  * (eobj) or (llobj) accordingly. 
 111  * When there are no longer elements -1 is returned. 
 112  * Returned objects ref count is not incremented, so this function is 
 113  * copy on write friendly. */ 
 114 int setTypeNext(setTypeIterator 
*si
, robj 
**objele
, int64_t *llele
) { 
 115     if (si
->encoding 
== REDIS_ENCODING_HT
) { 
 116         dictEntry 
*de 
= dictNext(si
->di
); 
 117         if (de 
== NULL
) return -1; 
 118         *objele 
= dictGetKey(de
); 
 119     } else if (si
->encoding 
== REDIS_ENCODING_INTSET
) { 
 120         if (!intsetGet(si
->subject
->ptr
,si
->ii
++,llele
)) 
 126 /* The not copy on write friendly version but easy to use version 
 127  * of setTypeNext() is setTypeNextObject(), returning new objects 
 128  * or incrementing the ref count of returned objects. So if you don't 
 129  * retain a pointer to this object you should call decrRefCount() against it. 
 131  * This function is the way to go for write operations where COW is not 
 132  * an issue as the result will be anyway of incrementing the ref count. */ 
 133 robj 
*setTypeNextObject(setTypeIterator 
*si
) { 
 138     encoding 
= setTypeNext(si
,&objele
,&intele
); 
 140         case -1:    return NULL
; 
 141         case REDIS_ENCODING_INTSET
: 
 142             return createStringObjectFromLongLong(intele
); 
 143         case REDIS_ENCODING_HT
: 
 144             incrRefCount(objele
); 
 147             redisPanic("Unsupported encoding"); 
 149     return NULL
; /* just to suppress warnings */ 
 152 /* Return random element from a non empty set. 
 153  * The returned element can be a int64_t value if the set is encoded 
 154  * as an "intset" blob of integers, or a redis object if the set 
 157  * The caller provides both pointers to be populated with the right 
 158  * object. The return value of the function is the object->encoding 
 159  * field of the object and is used by the caller to check if the 
 160  * int64_t pointer or the redis object pointere was populated. 
 162  * When an object is returned (the set was a real set) the ref count 
 163  * of the object is not incremented so this function can be considered 
 164  * copy on write friendly. */ 
 165 int setTypeRandomElement(robj 
*setobj
, robj 
**objele
, int64_t *llele
) { 
 166     if (setobj
->encoding 
== REDIS_ENCODING_HT
) { 
 167         dictEntry 
*de 
= dictGetRandomKey(setobj
->ptr
); 
 168         *objele 
= dictGetKey(de
); 
 169     } else if (setobj
->encoding 
== REDIS_ENCODING_INTSET
) { 
 170         *llele 
= intsetRandom(setobj
->ptr
); 
 172         redisPanic("Unknown set encoding"); 
 174     return setobj
->encoding
; 
 177 unsigned long setTypeSize(robj 
*subject
) { 
 178     if (subject
->encoding 
== REDIS_ENCODING_HT
) { 
 179         return dictSize((dict
*)subject
->ptr
); 
 180     } else if (subject
->encoding 
== REDIS_ENCODING_INTSET
) { 
 181         return intsetLen((intset
*)subject
->ptr
); 
 183         redisPanic("Unknown set encoding"); 
 187 /* Convert the set to specified encoding. The resulting dict (when converting 
 188  * to a hashtable) is presized to hold the number of elements in the original 
 190 void setTypeConvert(robj 
*setobj
, int enc
) { 
 192     redisAssertWithInfo(NULL
,setobj
,setobj
->type 
== REDIS_SET 
&& 
 193                              setobj
->encoding 
== REDIS_ENCODING_INTSET
); 
 195     if (enc 
== REDIS_ENCODING_HT
) { 
 197         dict 
*d 
= dictCreate(&setDictType
,NULL
); 
 200         /* Presize the dict to avoid rehashing */ 
 201         dictExpand(d
,intsetLen(setobj
->ptr
)); 
 203         /* To add the elements we extract integers and create redis objects */ 
 204         si 
= setTypeInitIterator(setobj
); 
 205         while (setTypeNext(si
,NULL
,&intele
) != -1) { 
 206             element 
= createStringObjectFromLongLong(intele
); 
 207             redisAssertWithInfo(NULL
,element
,dictAdd(d
,element
,NULL
) == DICT_OK
); 
 209         setTypeReleaseIterator(si
); 
 211         setobj
->encoding 
= REDIS_ENCODING_HT
; 
 215         redisPanic("Unsupported set conversion"); 
 219 void saddCommand(redisClient 
*c
) { 
 223     set 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
 225         set 
= setTypeCreate(c
->argv
[2]); 
 226         dbAdd(c
->db
,c
->argv
[1],set
); 
 228         if (set
->type 
!= REDIS_SET
) { 
 229             addReply(c
,shared
.wrongtypeerr
); 
 234     for (j 
= 2; j 
< c
->argc
; j
++) { 
 235         c
->argv
[j
] = tryObjectEncoding(c
->argv
[j
]); 
 236         if (setTypeAdd(set
,c
->argv
[j
])) added
++; 
 238     if (added
) signalModifiedKey(c
->db
,c
->argv
[1]); 
 239     server
.dirty 
+= added
; 
 240     addReplyLongLong(c
,added
); 
 243 void sremCommand(redisClient 
*c
) { 
 247     if ((set 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 248         checkType(c
,set
,REDIS_SET
)) return; 
 250     for (j 
= 2; j 
< c
->argc
; j
++) { 
 251         if (setTypeRemove(set
,c
->argv
[j
])) { 
 253             if (setTypeSize(set
) == 0) { 
 254                 dbDelete(c
->db
,c
->argv
[1]); 
 260         signalModifiedKey(c
->db
,c
->argv
[1]); 
 261         server
.dirty 
+= deleted
; 
 263     addReplyLongLong(c
,deleted
); 
 266 void smoveCommand(redisClient 
*c
) { 
 267     robj 
*srcset
, *dstset
, *ele
; 
 268     srcset 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
 269     dstset 
= lookupKeyWrite(c
->db
,c
->argv
[2]); 
 270     ele 
= c
->argv
[3] = tryObjectEncoding(c
->argv
[3]); 
 272     /* If the source key does not exist return 0 */ 
 273     if (srcset 
== NULL
) { 
 274         addReply(c
,shared
.czero
); 
 278     /* If the source key has the wrong type, or the destination key 
 279      * is set and has the wrong type, return with an error. */ 
 280     if (checkType(c
,srcset
,REDIS_SET
) || 
 281         (dstset 
&& checkType(c
,dstset
,REDIS_SET
))) return; 
 283     /* If srcset and dstset are equal, SMOVE is a no-op */ 
 284     if (srcset 
== dstset
) { 
 285         addReply(c
,shared
.cone
); 
 289     /* If the element cannot be removed from the src set, return 0. */ 
 290     if (!setTypeRemove(srcset
,ele
)) { 
 291         addReply(c
,shared
.czero
); 
 295     /* Remove the src set from the database when empty */ 
 296     if (setTypeSize(srcset
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 297     signalModifiedKey(c
->db
,c
->argv
[1]); 
 298     signalModifiedKey(c
->db
,c
->argv
[2]); 
 301     /* Create the destination set when it doesn't exist */ 
 303         dstset 
= setTypeCreate(ele
); 
 304         dbAdd(c
->db
,c
->argv
[2],dstset
); 
 307     /* An extra key has changed when ele was successfully added to dstset */ 
 308     if (setTypeAdd(dstset
,ele
)) server
.dirty
++; 
 309     addReply(c
,shared
.cone
); 
 312 void sismemberCommand(redisClient 
*c
) { 
 315     if ((set 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 316         checkType(c
,set
,REDIS_SET
)) return; 
 318     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
 319     if (setTypeIsMember(set
,c
->argv
[2])) 
 320         addReply(c
,shared
.cone
); 
 322         addReply(c
,shared
.czero
); 
 325 void scardCommand(redisClient 
*c
) { 
 328     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 329         checkType(c
,o
,REDIS_SET
)) return; 
 331     addReplyLongLong(c
,setTypeSize(o
)); 
 334 void spopCommand(redisClient 
*c
) { 
 335     robj 
*set
, *ele
, *aux
; 
 339     if ((set 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
 340         checkType(c
,set
,REDIS_SET
)) return; 
 342     encoding 
= setTypeRandomElement(set
,&ele
,&llele
); 
 343     if (encoding 
== REDIS_ENCODING_INTSET
) { 
 344         ele 
= createStringObjectFromLongLong(llele
); 
 345         set
->ptr 
= intsetRemove(set
->ptr
,llele
,NULL
); 
 348         setTypeRemove(set
,ele
); 
 351     /* Replicate/AOF this command as an SREM operation */ 
 352     aux 
= createStringObject("SREM",4); 
 353     rewriteClientCommandVector(c
,3,aux
,c
->argv
[1],ele
); 
 358     if (setTypeSize(set
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 359     signalModifiedKey(c
->db
,c
->argv
[1]); 
 363 void srandmemberCommand(redisClient 
*c
) { 
 368     if ((set 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
 369         checkType(c
,set
,REDIS_SET
)) return; 
 371     encoding 
= setTypeRandomElement(set
,&ele
,&llele
); 
 372     if (encoding 
== REDIS_ENCODING_INTSET
) { 
 373         addReplyBulkLongLong(c
,llele
); 
 379 int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) { 
 380     return setTypeSize(*(robj
**)s1
)-setTypeSize(*(robj
**)s2
); 
 383 void sinterGenericCommand(redisClient 
*c
, robj 
**setkeys
, unsigned long setnum
, robj 
*dstkey
) { 
 384     robj 
**sets 
= zmalloc(sizeof(robj
*)*setnum
); 
 386     robj 
*eleobj
, *dstset 
= NULL
; 
 388     void *replylen 
= NULL
; 
 389     unsigned long j
, cardinality 
= 0; 
 392     for (j 
= 0; j 
< setnum
; j
++) { 
 393         robj 
*setobj 
= dstkey 
? 
 394             lookupKeyWrite(c
->db
,setkeys
[j
]) : 
 395             lookupKeyRead(c
->db
,setkeys
[j
]); 
 399                 if (dbDelete(c
->db
,dstkey
)) { 
 400                     signalModifiedKey(c
->db
,dstkey
); 
 403                 addReply(c
,shared
.czero
); 
 405                 addReply(c
,shared
.emptymultibulk
); 
 409         if (checkType(c
,setobj
,REDIS_SET
)) { 
 415     /* Sort sets from the smallest to largest, this will improve our 
 416      * algorithm's performace */ 
 417     qsort(sets
,setnum
,sizeof(robj
*),qsortCompareSetsByCardinality
); 
 419     /* The first thing we should output is the total number of elements... 
 420      * since this is a multi-bulk write, but at this stage we don't know 
 421      * the intersection set size, so we use a trick, append an empty object 
 422      * to the output list and save the pointer to later modify it with the 
 425         replylen 
= addDeferredMultiBulkLength(c
); 
 427         /* If we have a target key where to store the resulting set 
 428          * create this key with an empty set inside */ 
 429         dstset 
= createIntsetObject(); 
 432     /* Iterate all the elements of the first (smallest) set, and test 
 433      * the element against all the other sets, if at least one set does 
 434      * not include the element it is discarded */ 
 435     si 
= setTypeInitIterator(sets
[0]); 
 436     while((encoding 
= setTypeNext(si
,&eleobj
,&intobj
)) != -1) { 
 437         for (j 
= 1; j 
< setnum
; j
++) { 
 438             if (sets
[j
] == sets
[0]) continue; 
 439             if (encoding 
== REDIS_ENCODING_INTSET
) { 
 440                 /* intset with intset is simple... and fast */ 
 441                 if (sets
[j
]->encoding 
== REDIS_ENCODING_INTSET 
&& 
 442                     !intsetFind((intset
*)sets
[j
]->ptr
,intobj
)) 
 445                 /* in order to compare an integer with an object we 
 446                  * have to use the generic function, creating an object 
 448                 } else if (sets
[j
]->encoding 
== REDIS_ENCODING_HT
) { 
 449                     eleobj 
= createStringObjectFromLongLong(intobj
); 
 450                     if (!setTypeIsMember(sets
[j
],eleobj
)) { 
 451                         decrRefCount(eleobj
); 
 454                     decrRefCount(eleobj
); 
 456             } else if (encoding 
== REDIS_ENCODING_HT
) { 
 457                 /* Optimization... if the source object is integer 
 458                  * encoded AND the target set is an intset, we can get 
 459                  * a much faster path. */ 
 460                 if (eleobj
->encoding 
== REDIS_ENCODING_INT 
&& 
 461                     sets
[j
]->encoding 
== REDIS_ENCODING_INTSET 
&& 
 462                     !intsetFind((intset
*)sets
[j
]->ptr
,(long)eleobj
->ptr
)) 
 465                 /* else... object to object check is easy as we use the 
 466                  * type agnostic API here. */ 
 467                 } else if (!setTypeIsMember(sets
[j
],eleobj
)) { 
 473         /* Only take action when all sets contain the member */ 
 476                 if (encoding 
== REDIS_ENCODING_HT
) 
 477                     addReplyBulk(c
,eleobj
); 
 479                     addReplyBulkLongLong(c
,intobj
); 
 482                 if (encoding 
== REDIS_ENCODING_INTSET
) { 
 483                     eleobj 
= createStringObjectFromLongLong(intobj
); 
 484                     setTypeAdd(dstset
,eleobj
); 
 485                     decrRefCount(eleobj
); 
 487                     setTypeAdd(dstset
,eleobj
); 
 492     setTypeReleaseIterator(si
); 
 495         /* Store the resulting set into the target, if the intersection 
 496          * is not an empty set. */ 
 497         dbDelete(c
->db
,dstkey
); 
 498         if (setTypeSize(dstset
) > 0) { 
 499             dbAdd(c
->db
,dstkey
,dstset
); 
 500             addReplyLongLong(c
,setTypeSize(dstset
)); 
 502             decrRefCount(dstset
); 
 503             addReply(c
,shared
.czero
); 
 505         signalModifiedKey(c
->db
,dstkey
); 
 508         setDeferredMultiBulkLength(c
,replylen
,cardinality
); 
 513 void sinterCommand(redisClient 
*c
) { 
 514     sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
); 
 517 void sinterstoreCommand(redisClient 
*c
) { 
 518     sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]); 
 521 #define REDIS_OP_UNION 0 
 522 #define REDIS_OP_DIFF 1 
 523 #define REDIS_OP_INTER 2 
 525 void sunionDiffGenericCommand(redisClient 
*c
, robj 
**setkeys
, int setnum
, robj 
*dstkey
, int op
) { 
 526     robj 
**sets 
= zmalloc(sizeof(robj
*)*setnum
); 
 528     robj 
*ele
, *dstset 
= NULL
; 
 529     int j
, cardinality 
= 0; 
 531     for (j 
= 0; j 
< setnum
; j
++) { 
 532         robj 
*setobj 
= dstkey 
? 
 533             lookupKeyWrite(c
->db
,setkeys
[j
]) : 
 534             lookupKeyRead(c
->db
,setkeys
[j
]); 
 539         if (checkType(c
,setobj
,REDIS_SET
)) { 
 546     /* We need a temp set object to store our union. If the dstkey 
 547      * is not NULL (that is, we are inside an SUNIONSTORE operation) then 
 548      * this set object will be the resulting object to set into the target key*/ 
 549     dstset 
= createIntsetObject(); 
 551     /* Iterate all the elements of all the sets, add every element a single 
 552      * time to the result set */ 
 553     for (j 
= 0; j 
< setnum
; j
++) { 
 554         if (op 
== REDIS_OP_DIFF 
&& j 
== 0 && !sets
[j
]) break; /* result set is empty */ 
 555         if (!sets
[j
]) continue; /* non existing keys are like empty sets */ 
 557         si 
= setTypeInitIterator(sets
[j
]); 
 558         while((ele 
= setTypeNextObject(si
)) != NULL
) { 
 559             if (op 
== REDIS_OP_UNION 
|| j 
== 0) { 
 560                 if (setTypeAdd(dstset
,ele
)) { 
 563             } else if (op 
== REDIS_OP_DIFF
) { 
 564                 if (setTypeRemove(dstset
,ele
)) { 
 570         setTypeReleaseIterator(si
); 
 572         /* Exit when result set is empty. */ 
 573         if (op 
== REDIS_OP_DIFF 
&& cardinality 
== 0) break; 
 576     /* Output the content of the resulting set, if not in STORE mode */ 
 578         addReplyMultiBulkLen(c
,cardinality
); 
 579         si 
= setTypeInitIterator(dstset
); 
 580         while((ele 
= setTypeNextObject(si
)) != NULL
) { 
 584         setTypeReleaseIterator(si
); 
 585         decrRefCount(dstset
); 
 587         /* If we have a target key where to store the resulting set 
 588          * create this key with the result set inside */ 
 589         dbDelete(c
->db
,dstkey
); 
 590         if (setTypeSize(dstset
) > 0) { 
 591             dbAdd(c
->db
,dstkey
,dstset
); 
 592             addReplyLongLong(c
,setTypeSize(dstset
)); 
 594             decrRefCount(dstset
); 
 595             addReply(c
,shared
.czero
); 
 597         signalModifiedKey(c
->db
,dstkey
); 
 603 void sunionCommand(redisClient 
*c
) { 
 604     sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
); 
 607 void sunionstoreCommand(redisClient 
*c
) { 
 608     sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
); 
 611 void sdiffCommand(redisClient 
*c
) { 
 612     sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
); 
 615 void sdiffstoreCommand(redisClient 
*c
) { 
 616     sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);