]> git.saurik.com Git - redis.git/blobdiff - src/t_hash.c
fixed diskstore race condition
[redis.git] / src / t_hash.c
index 5cef1cabbcd86addfd6133fcbd2c6261e2750ae5..5e7525bd13a383f38dc54f1420783c01a4acaf64 100644 (file)
@@ -31,27 +31,56 @@ void hashTypeTryObjectEncoding(robj *subject, robj **o1, robj **o2) {
     }
 }
 
     }
 }
 
-/* Get the value from a hash identified by key. Returns either a string
- * object or NULL if the value cannot be found. The refcount of the object
- * is always increased by 1 when the value was found. */
-robj *hashTypeGet(robj *o, robj *key) {
-    robj *value = NULL;
+/* Get the value from a hash identified by key.
+ *
+ * If the string is found either REDIS_ENCODING_HT or REDIS_ENCODING_ZIPMAP
+ * is returned, and either **objval or **v and *vlen are set accordingly,
+ * so that objects in hash tables are returend as objects and pointers
+ * inside a zipmap are returned as such.
+ *
+ * If the object was not found -1 is returned.
+ *
+ * This function is copy on write friendly as there is no incr/decr
+ * of refcount needed if objects are accessed just for reading operations. */
+int hashTypeGet(robj *o, robj *key, robj **objval, unsigned char **v,
+                unsigned int *vlen)
+{
     if (o->encoding == REDIS_ENCODING_ZIPMAP) {
     if (o->encoding == REDIS_ENCODING_ZIPMAP) {
-        unsigned char *v;
-        unsigned int vlen;
+        int found;
+
         key = getDecodedObject(key);
         key = getDecodedObject(key);
-        if (zipmapGet(o->ptr,key->ptr,sdslen(key->ptr),&v,&vlen)) {
-            value = createStringObject((char*)v,vlen);
-        }
+        found = zipmapGet(o->ptr,key->ptr,sdslen(key->ptr),v,vlen);
         decrRefCount(key);
         decrRefCount(key);
+        if (!found) return -1;
     } else {
         dictEntry *de = dictFind(o->ptr,key);
     } else {
         dictEntry *de = dictFind(o->ptr,key);
-        if (de != NULL) {
-            value = dictGetEntryVal(de);
-            incrRefCount(value);
-        }
+        if (de == NULL) return -1;
+        *objval = dictGetEntryVal(de);
+    }
+    return o->encoding;
+}
+
+/* Higher level function of hashTypeGet() that always returns a Redis
+ * object (either new or with refcount incremented), so that the caller
+ * can retain a reference or call decrRefCount after the usage.
+ *
+ * The lower level function can prevent copy on write so it is
+ * the preferred way of doing read operations. */
+robj *hashTypeGetObject(robj *o, robj *key) {
+    robj *objval;
+    unsigned char *v;
+    unsigned int vlen;
+
+    int encoding = hashTypeGet(o,key,&objval,&v,&vlen);
+    switch(encoding) {
+        case REDIS_ENCODING_HT:
+            incrRefCount(objval);
+            return objval;
+        case REDIS_ENCODING_ZIPMAP:
+            objval = createStringObject((char*)v,vlen);
+            return objval;
+        default: return NULL;
     }
     }
-    return value;
 }
 
 /* Test if the key exists in the given hash. Returns 1 if the key
 }
 
 /* Test if the key exists in the given hash. Returns 1 if the key
@@ -156,24 +185,50 @@ int hashTypeNext(hashTypeIterator *hi) {
 }
 
 /* Get key or value object at current iteration position.
 }
 
 /* Get key or value object at current iteration position.
- * This increases the refcount of the field object by 1. */
-robj *hashTypeCurrent(hashTypeIterator *hi, int what) {
-    robj *o;
+ * The returned item differs with the hash object encoding:
+ * - When encoding is REDIS_ENCODING_HT, the objval pointer is populated
+ *   with the original object.
+ * - When encoding is REDIS_ENCODING_ZIPMAP, a pointer to the string and
+ *   its length is retunred populating the v and vlen pointers.
+ * This function is copy on write friendly as accessing objects in read only
+ * does not require writing to any memory page.
+ *
+ * The function returns the encoding of the object, so that the caller
+ * can underestand if the key or value was returned as object or C string. */
+int hashTypeCurrent(hashTypeIterator *hi, int what, robj **objval, unsigned char **v, unsigned int *vlen) {
     if (hi->encoding == REDIS_ENCODING_ZIPMAP) {
         if (what & REDIS_HASH_KEY) {
     if (hi->encoding == REDIS_ENCODING_ZIPMAP) {
         if (what & REDIS_HASH_KEY) {
-            o = createStringObject((char*)hi->zk,hi->zklen);
+            *v = hi->zk;
+            *vlen = hi->zklen;
         } else {
         } else {
-            o = createStringObject((char*)hi->zv,hi->zvlen);
+            *v = hi->zv;
+            *vlen = hi->zvlen;
         }
     } else {
         }
     } else {
-        if (what & REDIS_HASH_KEY) {
-            o = dictGetEntryKey(hi->de);
-        } else {
-            o = dictGetEntryVal(hi->de);
-        }
-        incrRefCount(o);
+        if (what & REDIS_HASH_KEY)
+            *objval = dictGetEntryKey(hi->de);
+        else
+            *objval = dictGetEntryVal(hi->de);
+    }
+    return hi->encoding;
+}
+
+/* A non copy-on-write friendly but higher level version of hashTypeCurrent()
+ * that always returns an object with refcount incremented by one (or a new
+ * object), so it's up to the caller to decrRefCount() the object if no
+ * reference is retained. */
+robj *hashTypeCurrentObject(hashTypeIterator *hi, int what) {
+    robj *obj;
+    unsigned char *v = NULL;
+    unsigned int vlen = 0;
+    int encoding = hashTypeCurrent(hi,what,&obj,&v,&vlen);
+
+    if (encoding == REDIS_ENCODING_HT) {
+        incrRefCount(obj);
+        return obj;
+    } else {
+        return createStringObject((char*)v,vlen);
     }
     }
-    return o;
 }
 
 robj *hashTypeLookupWriteOrCreate(redisClient *c, robj *key) {
 }
 
 robj *hashTypeLookupWriteOrCreate(redisClient *c, robj *key) {
@@ -224,7 +279,7 @@ void hsetCommand(redisClient *c) {
     hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
     update = hashTypeSet(o,c->argv[2],c->argv[3]);
     addReply(c, update ? shared.czero : shared.cone);
     hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
     update = hashTypeSet(o,c->argv[2],c->argv[3]);
     addReply(c, update ? shared.czero : shared.cone);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
 }
 
     server.dirty++;
 }
 
@@ -239,7 +294,7 @@ void hsetnxCommand(redisClient *c) {
         hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
         hashTypeSet(o,c->argv[2],c->argv[3]);
         addReply(c, shared.cone);
         hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
         hashTypeSet(o,c->argv[2],c->argv[3]);
         addReply(c, shared.cone);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
         server.dirty++;
     }
 }
@@ -260,7 +315,7 @@ void hmsetCommand(redisClient *c) {
         hashTypeSet(o,c->argv[i],c->argv[i+1]);
     }
     addReply(c, shared.ok);
         hashTypeSet(o,c->argv[i],c->argv[i+1]);
     }
     addReply(c, shared.ok);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
 }
 
     server.dirty++;
 }
 
@@ -270,7 +325,7 @@ void hincrbyCommand(redisClient *c) {
 
     if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return;
     if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
 
     if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return;
     if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
-    if ((current = hashTypeGet(o,c->argv[2])) != NULL) {
+    if ((current = hashTypeGetObject(o,c->argv[2])) != NULL) {
         if (getLongLongFromObjectOrReply(c,current,&value,
             "hash value is not an integer") != REDIS_OK) {
             decrRefCount(current);
         if (getLongLongFromObjectOrReply(c,current,&value,
             "hash value is not an integer") != REDIS_OK) {
             decrRefCount(current);
@@ -287,29 +342,39 @@ void hincrbyCommand(redisClient *c) {
     hashTypeSet(o,c->argv[2],new);
     decrRefCount(new);
     addReplyLongLong(c,value);
     hashTypeSet(o,c->argv[2],new);
     decrRefCount(new);
     addReplyLongLong(c,value);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
 }
 
 void hgetCommand(redisClient *c) {
     robj *o, *value;
     server.dirty++;
 }
 
 void hgetCommand(redisClient *c) {
     robj *o, *value;
+    unsigned char *v;
+    unsigned int vlen;
+    int encoding;
+
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,o,REDIS_HASH)) return;
 
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,o,REDIS_HASH)) return;
 
-    if ((value = hashTypeGet(o,c->argv[2])) != NULL) {
-        addReplyBulk(c,value);
-        decrRefCount(value);
+    if ((encoding = hashTypeGet(o,c->argv[2],&value,&v,&vlen)) != -1) {
+        if (encoding == REDIS_ENCODING_HT)
+            addReplyBulk(c,value);
+        else
+            addReplyBulkCBuffer(c,v,vlen);
     } else {
         addReply(c,shared.nullbulk);
     }
 }
 
 void hmgetCommand(redisClient *c) {
     } else {
         addReply(c,shared.nullbulk);
     }
 }
 
 void hmgetCommand(redisClient *c) {
-    int i;
+    int i, encoding;
     robj *o, *value;
     robj *o, *value;
+    unsigned char *v;
+    unsigned int vlen;
+
     o = lookupKeyRead(c->db,c->argv[1]);
     if (o != NULL && o->type != REDIS_HASH) {
         addReply(c,shared.wrongtypeerr);
     o = lookupKeyRead(c->db,c->argv[1]);
     if (o != NULL && o->type != REDIS_HASH) {
         addReply(c,shared.wrongtypeerr);
+        return;
     }
 
     /* Note the check for o != NULL happens inside the loop. This is
     }
 
     /* Note the check for o != NULL happens inside the loop. This is
@@ -317,9 +382,12 @@ void hmgetCommand(redisClient *c) {
      * an empty hash. The reply should then be a series of NULLs. */
     addReplyMultiBulkLen(c,c->argc-2);
     for (i = 2; i < c->argc; i++) {
      * an empty hash. The reply should then be a series of NULLs. */
     addReplyMultiBulkLen(c,c->argc-2);
     for (i = 2; i < c->argc; i++) {
-        if (o != NULL && (value = hashTypeGet(o,c->argv[i])) != NULL) {
-            addReplyBulk(c,value);
-            decrRefCount(value);
+        if (o != NULL &&
+            (encoding = hashTypeGet(o,c->argv[i],&value,&v,&vlen)) != -1) {
+            if (encoding == REDIS_ENCODING_HT)
+                addReplyBulk(c,value);
+            else
+                addReplyBulkCBuffer(c,v,vlen);
         } else {
             addReply(c,shared.nullbulk);
         }
         } else {
             addReply(c,shared.nullbulk);
         }
@@ -334,7 +402,7 @@ void hdelCommand(redisClient *c) {
     if (hashTypeDelete(o,c->argv[2])) {
         if (hashTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
         addReply(c,shared.cone);
     if (hashTypeDelete(o,c->argv[2])) {
         if (hashTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
         addReply(c,shared.cone);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     } else {
         addReply(c,shared.czero);
         server.dirty++;
     } else {
         addReply(c,shared.czero);
@@ -350,7 +418,7 @@ void hlenCommand(redisClient *c) {
 }
 
 void genericHgetallCommand(redisClient *c, int flags) {
 }
 
 void genericHgetallCommand(redisClient *c, int flags) {
-    robj *o, *obj;
+    robj *o;
     unsigned long count = 0;
     hashTypeIterator *hi;
     void *replylen = NULL;
     unsigned long count = 0;
     hashTypeIterator *hi;
     void *replylen = NULL;
@@ -361,16 +429,25 @@ void genericHgetallCommand(redisClient *c, int flags) {
     replylen = addDeferredMultiBulkLength(c);
     hi = hashTypeInitIterator(o);
     while (hashTypeNext(hi) != REDIS_ERR) {
     replylen = addDeferredMultiBulkLength(c);
     hi = hashTypeInitIterator(o);
     while (hashTypeNext(hi) != REDIS_ERR) {
+        robj *obj;
+        unsigned char *v = NULL;
+        unsigned int vlen = 0;
+        int encoding;
+
         if (flags & REDIS_HASH_KEY) {
         if (flags & REDIS_HASH_KEY) {
-            obj = hashTypeCurrent(hi,REDIS_HASH_KEY);
-            addReplyBulk(c,obj);
-            decrRefCount(obj);
+            encoding = hashTypeCurrent(hi,REDIS_HASH_KEY,&obj,&v,&vlen);
+            if (encoding == REDIS_ENCODING_HT)
+                addReplyBulk(c,obj);
+            else
+                addReplyBulkCBuffer(c,v,vlen);
             count++;
         }
         if (flags & REDIS_HASH_VALUE) {
             count++;
         }
         if (flags & REDIS_HASH_VALUE) {
-            obj = hashTypeCurrent(hi,REDIS_HASH_VALUE);
-            addReplyBulk(c,obj);
-            decrRefCount(obj);
+            encoding = hashTypeCurrent(hi,REDIS_HASH_VALUE,&obj,&v,&vlen);
+            if (encoding == REDIS_ENCODING_HT)
+                addReplyBulk(c,obj);
+            else
+                addReplyBulkCBuffer(c,v,vlen);
             count++;
         }
     }
             count++;
         }
     }