COW friendly versions of SPOP and SRANDMEMBER commands, with some change to the set...
authorantirez <antirez@gmail.com>
Thu, 9 Dec 2010 09:21:02 +0000 (10:21 +0100)
committerantirez <antirez@gmail.com>
Thu, 9 Dec 2010 09:21:02 +0000 (10:21 +0100)
src/intset.c
src/intset.h
src/redis.h
src/t_set.c

index 2f359b7ffa249ad2c1c4c557c6986b38c909a7e7..bfd3307d2a242bd17279312dbfc02906bf73c9ca 100644 (file)
@@ -179,7 +179,7 @@ intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
 }
 
 /* Delete integer from intset */
-intset *intsetRemove(intset *is, int64_t value, uint8_t *success) {
+intset *intsetRemove(intset *is, int64_t value, int *success) {
     uint8_t valenc = _intsetValueEncoding(value);
     uint32_t pos;
     if (success) *success = 0;
index 25afc18d12b4b155c481082d6f3e1b4aa10d65e2..10d49d2e08a0cca8a499a7bffa5be01bdb4d9a26 100644 (file)
@@ -10,7 +10,7 @@ typedef struct intset {
 
 intset *intsetNew(void);
 intset *intsetAdd(intset *is, int64_t value, uint8_t *success);
-intset *intsetRemove(intset *is, int64_t value, uint8_t *success);
+intset *intsetRemove(intset *is, int64_t value, int *success);
 uint8_t intsetFind(intset *is, int64_t value);
 int64_t intsetRandom(intset *is);
 uint8_t intsetGet(intset *is, uint32_t pos, int64_t *value);
index e5db917e44ef1219947b4497139bd3eadb25258c..e012db4c4da9ba009464bcd6cd01e4365f23c0c4 100644 (file)
@@ -814,7 +814,7 @@ int setTypeIsMember(robj *subject, robj *value);
 setTypeIterator *setTypeInitIterator(robj *subject);
 void setTypeReleaseIterator(setTypeIterator *si);
 robj *setTypeNext(setTypeIterator *si);
-robj *setTypeRandomElement(robj *subject);
+int setTypeRandomElement(robj *setobj, robj **objele, long long *llele);
 unsigned long setTypeSize(robj *subject);
 void setTypeConvert(robj *subject, int enc);
 
index 234efc7de6291fa90b9e913f9c10961502715864..e15952c05c6c9c0979dc878113fc325bdd14d6ba 100644 (file)
@@ -47,17 +47,17 @@ int setTypeAdd(robj *subject, robj *value) {
     return 0;
 }
 
-int setTypeRemove(robj *subject, robj *value) {
+int setTypeRemove(robj *setobj, robj *value) {
     long long llval;
-    if (subject->encoding == REDIS_ENCODING_HT) {
-        if (dictDelete(subject->ptr,value) == DICT_OK) {
-            if (htNeedsResize(subject->ptr)) dictResize(subject->ptr);
+    if (setobj->encoding == REDIS_ENCODING_HT) {
+        if (dictDelete(setobj->ptr,value) == DICT_OK) {
+            if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr);
             return 1;
         }
-    } else if (subject->encoding == REDIS_ENCODING_INTSET) {
+    } else if (setobj->encoding == REDIS_ENCODING_INTSET) {
         if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) {
-            uint8_t success;
-            subject->ptr = intsetRemove(subject->ptr,llval,&success);
+            int success;
+            setobj->ptr = intsetRemove(setobj->ptr,llval,&success);
             if (success) return 1;
         }
     } else {
@@ -120,21 +120,29 @@ robj *setTypeNext(setTypeIterator *si) {
 }
 
 
-/* Return random element from set. The returned object will always have
- * an incremented refcount. */
-robj *setTypeRandomElement(robj *subject) {
-    robj *ret = NULL;
-    if (subject->encoding == REDIS_ENCODING_HT) {
-        dictEntry *de = dictGetRandomKey(subject->ptr);
-        ret = dictGetEntryKey(de);
-        incrRefCount(ret);
-    } else if (subject->encoding == REDIS_ENCODING_INTSET) {
-        long long llval = intsetRandom(subject->ptr);
-        ret = createStringObjectFromLongLong(llval);
+/* Return random element from a non empty set.
+ * The returned element can be a long long value if the set is encoded
+ * as an "intset" blob of integers, or a redis object if the set
+ * is a regular set.
+ *
+ * The caller provides both pointers to be populated with the right
+ * object. The return value of the function is the object->encoding
+ * field of the object and is used by the caller to check if the
+ * long long pointer or the redis object pointere was populated.
+ *
+ * When an object is returned (the set was a real set) the ref count
+ * of the object is not incremented so this function can be considered
+ * copy-on-write friendly. */
+int setTypeRandomElement(robj *setobj, robj **objele, long long *llele) {
+    if (setobj->encoding == REDIS_ENCODING_HT) {
+        dictEntry *de = dictGetRandomKey(setobj->ptr);
+        *objele = dictGetEntryKey(de);
+    } else if (setobj->encoding == REDIS_ENCODING_INTSET) {
+        *llele = intsetRandom(setobj->ptr);
     } else {
         redisPanic("Unknown set encoding");
     }
-    return ret;
+    return setobj->encoding;
 }
 
 unsigned long setTypeSize(robj *subject) {
@@ -284,35 +292,38 @@ void scardCommand(redisClient *c) {
 
 void spopCommand(redisClient *c) {
     robj *set, *ele;
+    long long llele;
+    int encoding;
 
     if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
-    ele = setTypeRandomElement(set);
-    if (ele == NULL) {
-        addReply(c,shared.nullbulk);
+    encoding = setTypeRandomElement(set,&ele,&llele);
+    if (encoding == REDIS_ENCODING_INTSET) {
+        addReplyBulkLongLong(c,llele);
+        set->ptr = intsetRemove(set->ptr,llele,NULL);
     } else {
-        setTypeRemove(set,ele);
         addReplyBulk(c,ele);
-        decrRefCount(ele);
-        if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
-        touchWatchedKey(c->db,c->argv[1]);
-        server.dirty++;
+        setTypeRemove(set,ele);
     }
+    if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
+    touchWatchedKey(c->db,c->argv[1]);
+    server.dirty++;
 }
 
 void srandmemberCommand(redisClient *c) {
     robj *set, *ele;
+    long long llele;
+    int encoding;
 
     if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
-    ele = setTypeRandomElement(set);
-    if (ele == NULL) {
-        addReply(c,shared.nullbulk);
+    encoding = setTypeRandomElement(set,&ele,&llele);
+    if (encoding == REDIS_ENCODING_INTSET) {
+        addReplyBulkLongLong(c,llele);
     } else {
         addReplyBulk(c,ele);
-        decrRefCount(ele);
     }
 }