3 /*----------------------------------------------------------------------------- 
   5  *----------------------------------------------------------------------------*/ 
   7 /* Factory method to return a set that *can* hold "value". When the object has 
   8  * an integer-encodable value, an intset will be returned. Otherwise a regular 
  10 robj 
*setTypeCreate(robj 
*value
) { 
  11     if (getLongLongFromObject(value
,NULL
) == REDIS_OK
) 
  12         return createIntsetObject(); 
  13     return createSetObject(); 
  16 int setTypeAdd(robj 
*subject
, robj 
*value
) { 
  18     if (subject
->encoding 
== REDIS_ENCODING_HT
) { 
  19         if (dictAdd(subject
->ptr
,value
,NULL
) == DICT_OK
) { 
  23     } else if (subject
->encoding 
== REDIS_ENCODING_INTSET
) { 
  24         if (getLongLongFromObject(value
,&llval
) == REDIS_OK
) { 
  26             subject
->ptr 
= intsetAdd(subject
->ptr
,llval
,&success
); 
  28                 /* Convert to regular set when the intset contains 
  29                  * too many entries. */ 
  30                 if (intsetLen(subject
->ptr
) > server
.set_max_intset_entries
) 
  31                     setTypeConvert(subject
,REDIS_ENCODING_HT
); 
  35             /* Failed to get integer from object, convert to regular set. */ 
  36             setTypeConvert(subject
,REDIS_ENCODING_HT
); 
  38             /* The set *was* an intset and this value is not integer 
  39              * encodable, so dictAdd should always work. */ 
  40             redisAssert(dictAdd(subject
->ptr
,value
,NULL
) == DICT_OK
); 
  45         redisPanic("Unknown set encoding"); 
  50 int setTypeRemove(robj 
*subject
, robj 
*value
) { 
  52     if (subject
->encoding 
== REDIS_ENCODING_HT
) { 
  53         if (dictDelete(subject
->ptr
,value
) == DICT_OK
) { 
  54             if (htNeedsResize(subject
->ptr
)) dictResize(subject
->ptr
); 
  57     } else if (subject
->encoding 
== REDIS_ENCODING_INTSET
) { 
  58         if (getLongLongFromObject(value
,&llval
) == REDIS_OK
) { 
  60             subject
->ptr 
= intsetRemove(subject
->ptr
,llval
,&success
); 
  61             if (success
) return 1; 
  64         redisPanic("Unknown set encoding"); 
  69 int setTypeIsMember(robj 
*subject
, robj 
*value
) { 
  71     if (subject
->encoding 
== REDIS_ENCODING_HT
) { 
  72         return dictFind((dict
*)subject
->ptr
,value
) != NULL
; 
  73     } else if (subject
->encoding 
== REDIS_ENCODING_INTSET
) { 
  74         if (getLongLongFromObject(value
,&llval
) == REDIS_OK
) { 
  75             return intsetFind((intset
*)subject
->ptr
,llval
); 
  78         redisPanic("Unknown set encoding"); 
  83 setTypeIterator 
*setTypeInitIterator(robj 
*subject
) { 
  84     setTypeIterator 
*si 
= zmalloc(sizeof(setTypeIterator
)); 
  85     si
->subject 
= subject
; 
  86     si
->encoding 
= subject
->encoding
; 
  87     if (si
->encoding 
== REDIS_ENCODING_HT
) { 
  88         si
->di 
= dictGetIterator(subject
->ptr
); 
  89     } else if (si
->encoding 
== REDIS_ENCODING_INTSET
) { 
  92         redisPanic("Unknown set encoding"); 
  97 void setTypeReleaseIterator(setTypeIterator 
*si
) { 
  98     if (si
->encoding 
== REDIS_ENCODING_HT
) 
  99         dictReleaseIterator(si
->di
); 
 103 /* Move to the next entry in the set. Returns the object at the current 
 104  * position, or NULL when the end is reached. This object will have its 
 105  * refcount incremented, so the caller needs to take care of this. */ 
 106 robj 
*setTypeNext(setTypeIterator 
*si
) { 
 108     if (si
->encoding 
== REDIS_ENCODING_HT
) { 
 109         dictEntry 
*de 
= dictNext(si
->di
); 
 111             ret 
= dictGetEntryKey(de
); 
 114     } else if (si
->encoding 
== REDIS_ENCODING_INTSET
) { 
 116         if (intsetGet(si
->subject
->ptr
,si
->ii
++,&llval
)) 
 117             ret 
= createStringObjectFromLongLong(llval
); 
 123 /* Return random element from set. The returned object will always have 
 124  * an incremented refcount. */ 
 125 robj 
*setTypeRandomElement(robj 
*subject
) { 
 127     if (subject
->encoding 
== REDIS_ENCODING_HT
) { 
 128         dictEntry 
*de 
= dictGetRandomKey(subject
->ptr
); 
 129         ret 
= dictGetEntryKey(de
); 
 131     } else if (subject
->encoding 
== REDIS_ENCODING_INTSET
) { 
 132         long long llval 
= intsetRandom(subject
->ptr
); 
 133         ret 
= createStringObjectFromLongLong(llval
); 
 135         redisPanic("Unknown set encoding"); 
 140 unsigned long setTypeSize(robj 
*subject
) { 
 141     if (subject
->encoding 
== REDIS_ENCODING_HT
) { 
 142         return dictSize((dict
*)subject
->ptr
); 
 143     } else if (subject
->encoding 
== REDIS_ENCODING_INTSET
) { 
 144         return intsetLen((intset
*)subject
->ptr
); 
 146         redisPanic("Unknown set encoding"); 
 150 /* Convert the set to specified encoding. The resulting dict (when converting 
 151  * to a hashtable) is presized to hold the number of elements in the original 
 153 void setTypeConvert(robj 
*subject
, int enc
) { 
 156     redisAssert(subject
->type 
== REDIS_SET
); 
 158     if (enc 
== REDIS_ENCODING_HT
) { 
 159         dict 
*d 
= dictCreate(&setDictType
,NULL
); 
 160         /* Presize the dict to avoid rehashing */ 
 161         dictExpand(d
,intsetLen(subject
->ptr
)); 
 163         /* setTypeGet returns a robj with incremented refcount */ 
 164         si 
= setTypeInitIterator(subject
); 
 165         while ((element 
= setTypeNext(si
)) != NULL
) 
 166             redisAssert(dictAdd(d
,element
,NULL
) == DICT_OK
); 
 167         setTypeReleaseIterator(si
); 
 169         subject
->encoding 
= REDIS_ENCODING_HT
; 
 173         redisPanic("Unsupported set conversion"); 
 177 void saddCommand(redisClient 
*c
) { 
 180     set 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
 182         set 
= setTypeCreate(c
->argv
[2]); 
 183         dbAdd(c
->db
,c
->argv
[1],set
); 
 185         if (set
->type 
!= REDIS_SET
) { 
 186             addReply(c
,shared
.wrongtypeerr
); 
 190     if (setTypeAdd(set
,c
->argv
[2])) { 
 191         touchWatchedKey(c
->db
,c
->argv
[1]); 
 193         addReply(c
,shared
.cone
); 
 195         addReply(c
,shared
.czero
); 
 199 void sremCommand(redisClient 
*c
) { 
 202     if ((set 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 203         checkType(c
,set
,REDIS_SET
)) return; 
 205     if (setTypeRemove(set
,c
->argv
[2])) { 
 206         if (setTypeSize(set
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 207         touchWatchedKey(c
->db
,c
->argv
[1]); 
 209         addReply(c
,shared
.cone
); 
 211         addReply(c
,shared
.czero
); 
 215 void smoveCommand(redisClient 
*c
) { 
 216     robj 
*srcset
, *dstset
, *ele
; 
 217     srcset 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
 218     dstset 
= lookupKeyWrite(c
->db
,c
->argv
[2]); 
 221     /* If the source key does not exist return 0 */ 
 222     if (srcset 
== NULL
) { 
 223         addReply(c
,shared
.czero
); 
 227     /* If the source key has the wrong type, or the destination key 
 228      * is set and has the wrong type, return with an error. */ 
 229     if (checkType(c
,srcset
,REDIS_SET
) || 
 230         (dstset 
&& checkType(c
,dstset
,REDIS_SET
))) return; 
 232     /* If srcset and dstset are equal, SMOVE is a no-op */ 
 233     if (srcset 
== dstset
) { 
 234         addReply(c
,shared
.cone
); 
 238     /* If the element cannot be removed from the src set, return 0. */ 
 239     if (!setTypeRemove(srcset
,ele
)) { 
 240         addReply(c
,shared
.czero
); 
 244     /* Remove the src set from the database when empty */ 
 245     if (setTypeSize(srcset
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 246     touchWatchedKey(c
->db
,c
->argv
[1]); 
 247     touchWatchedKey(c
->db
,c
->argv
[2]); 
 250     /* Create the destination set when it doesn't exist */ 
 252         dstset 
= setTypeCreate(ele
); 
 253         dbAdd(c
->db
,c
->argv
[2],dstset
); 
 256     /* An extra key has changed when ele was successfully added to dstset */ 
 257     if (setTypeAdd(dstset
,ele
)) server
.dirty
++; 
 258     addReply(c
,shared
.cone
); 
 261 void sismemberCommand(redisClient 
*c
) { 
 264     if ((set 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 265         checkType(c
,set
,REDIS_SET
)) return; 
 267     if (setTypeIsMember(set
,c
->argv
[2])) 
 268         addReply(c
,shared
.cone
); 
 270         addReply(c
,shared
.czero
); 
 273 void scardCommand(redisClient 
*c
) { 
 276     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 277         checkType(c
,o
,REDIS_SET
)) return; 
 279     addReplyUlong(c
,setTypeSize(o
)); 
 282 void spopCommand(redisClient 
*c
) { 
 285     if ((set 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
 286         checkType(c
,set
,REDIS_SET
)) return; 
 288     ele 
= setTypeRandomElement(set
); 
 290         addReply(c
,shared
.nullbulk
); 
 292         setTypeRemove(set
,ele
); 
 295         if (setTypeSize(set
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 296         touchWatchedKey(c
->db
,c
->argv
[1]); 
 301 void srandmemberCommand(redisClient 
*c
) { 
 304     if ((set 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
 305         checkType(c
,set
,REDIS_SET
)) return; 
 307     ele 
= setTypeRandomElement(set
); 
 309         addReply(c
,shared
.nullbulk
); 
 316 int qsortCompareSetsByCardinality(const void *s1
, const void *s2
) { 
 317     return setTypeSize(*(robj
**)s1
)-setTypeSize(*(robj
**)s2
); 
 320 void sinterGenericCommand(redisClient 
*c
, robj 
**setkeys
, unsigned long setnum
, robj 
*dstkey
) { 
 321     robj 
**sets 
= zmalloc(sizeof(robj
*)*setnum
); 
 323     robj 
*ele
, *lenobj 
= NULL
, *dstset 
= NULL
; 
 324     unsigned long j
, cardinality 
= 0; 
 326     for (j 
= 0; j 
< setnum
; j
++) { 
 327         robj 
*setobj 
= dstkey 
? 
 328             lookupKeyWrite(c
->db
,setkeys
[j
]) : 
 329             lookupKeyRead(c
->db
,setkeys
[j
]); 
 333                 if (dbDelete(c
->db
,dstkey
)) { 
 334                     touchWatchedKey(c
->db
,dstkey
); 
 337                 addReply(c
,shared
.czero
); 
 339                 addReply(c
,shared
.emptymultibulk
); 
 343         if (checkType(c
,setobj
,REDIS_SET
)) { 
 349     /* Sort sets from the smallest to largest, this will improve our 
 350      * algorithm's performace */ 
 351     qsort(sets
,setnum
,sizeof(robj
*),qsortCompareSetsByCardinality
); 
 353     /* The first thing we should output is the total number of elements... 
 354      * since this is a multi-bulk write, but at this stage we don't know 
 355      * the intersection set size, so we use a trick, append an empty object 
 356      * to the output list and save the pointer to later modify it with the 
 359         lenobj 
= createObject(REDIS_STRING
,NULL
); 
 361         decrRefCount(lenobj
); 
 363         /* If we have a target key where to store the resulting set 
 364          * create this key with an empty set inside */ 
 365         dstset 
= createIntsetObject(); 
 368     /* Iterate all the elements of the first (smallest) set, and test 
 369      * the element against all the other sets, if at least one set does 
 370      * not include the element it is discarded */ 
 371     si 
= setTypeInitIterator(sets
[0]); 
 372     while((ele 
= setTypeNext(si
)) != NULL
) { 
 373         for (j 
= 1; j 
< setnum
; j
++) 
 374             if (!setTypeIsMember(sets
[j
],ele
)) break; 
 376         /* Only take action when all sets contain the member */ 
 382                 setTypeAdd(dstset
,ele
); 
 387     setTypeReleaseIterator(si
); 
 390         /* Store the resulting set into the target, if the intersection 
 391          * is not an empty set. */ 
 392         dbDelete(c
->db
,dstkey
); 
 393         if (setTypeSize(dstset
) > 0) { 
 394             dbAdd(c
->db
,dstkey
,dstset
); 
 395             addReplyLongLong(c
,setTypeSize(dstset
)); 
 397             decrRefCount(dstset
); 
 398             addReply(c
,shared
.czero
); 
 400         touchWatchedKey(c
->db
,dstkey
); 
 403         lenobj
->ptr 
= sdscatprintf(sdsempty(),"*%lu\r\n",cardinality
); 
 408 void sinterCommand(redisClient 
*c
) { 
 409     sinterGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
); 
 412 void sinterstoreCommand(redisClient 
*c
) { 
 413     sinterGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1]); 
 416 #define REDIS_OP_UNION 0 
 417 #define REDIS_OP_DIFF 1 
 418 #define REDIS_OP_INTER 2 
 420 void sunionDiffGenericCommand(redisClient 
*c
, robj 
**setkeys
, int setnum
, robj 
*dstkey
, int op
) { 
 421     robj 
**sets 
= zmalloc(sizeof(robj
*)*setnum
); 
 423     robj 
*ele
, *dstset 
= NULL
; 
 424     int j
, cardinality 
= 0; 
 426     for (j 
= 0; j 
< setnum
; j
++) { 
 427         robj 
*setobj 
= dstkey 
? 
 428             lookupKeyWrite(c
->db
,setkeys
[j
]) : 
 429             lookupKeyRead(c
->db
,setkeys
[j
]); 
 434         if (checkType(c
,setobj
,REDIS_SET
)) { 
 441     /* We need a temp set object to store our union. If the dstkey 
 442      * is not NULL (that is, we are inside an SUNIONSTORE operation) then 
 443      * this set object will be the resulting object to set into the target key*/ 
 444     dstset 
= createIntsetObject(); 
 446     /* Iterate all the elements of all the sets, add every element a single 
 447      * time to the result set */ 
 448     for (j 
= 0; j 
< setnum
; j
++) { 
 449         if (op 
== REDIS_OP_DIFF 
&& j 
== 0 && !sets
[j
]) break; /* result set is empty */ 
 450         if (!sets
[j
]) continue; /* non existing keys are like empty sets */ 
 452         si 
= setTypeInitIterator(sets
[j
]); 
 453         while((ele 
= setTypeNext(si
)) != NULL
) { 
 454             if (op 
== REDIS_OP_UNION 
|| j 
== 0) { 
 455                 if (setTypeAdd(dstset
,ele
)) { 
 458             } else if (op 
== REDIS_OP_DIFF
) { 
 459                 if (setTypeRemove(dstset
,ele
)) { 
 465         setTypeReleaseIterator(si
); 
 467         /* Exit when result set is empty. */ 
 468         if (op 
== REDIS_OP_DIFF 
&& cardinality 
== 0) break; 
 471     /* Output the content of the resulting set, if not in STORE mode */ 
 473         addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",cardinality
)); 
 474         si 
= setTypeInitIterator(dstset
); 
 475         while((ele 
= setTypeNext(si
)) != NULL
) { 
 479         setTypeReleaseIterator(si
); 
 480         decrRefCount(dstset
); 
 482         /* If we have a target key where to store the resulting set 
 483          * create this key with the result set inside */ 
 484         dbDelete(c
->db
,dstkey
); 
 485         if (setTypeSize(dstset
) > 0) { 
 486             dbAdd(c
->db
,dstkey
,dstset
); 
 487             addReplyLongLong(c
,setTypeSize(dstset
)); 
 489             decrRefCount(dstset
); 
 490             addReply(c
,shared
.czero
); 
 492         touchWatchedKey(c
->db
,dstkey
); 
 498 void sunionCommand(redisClient 
*c
) { 
 499     sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_UNION
); 
 502 void sunionstoreCommand(redisClient 
*c
) { 
 503     sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_UNION
); 
 506 void sdiffCommand(redisClient 
*c
) { 
 507     sunionDiffGenericCommand(c
,c
->argv
+1,c
->argc
-1,NULL
,REDIS_OP_DIFF
); 
 510 void sdiffstoreCommand(redisClient 
*c
) { 
 511     sunionDiffGenericCommand(c
,c
->argv
+2,c
->argc
-2,c
->argv
[1],REDIS_OP_DIFF
);