]> git.saurik.com Git - redis.git/blobdiff - src/object.c
patch config.h for sync_file_range
[redis.git] / src / object.c
index f4c34dcfd8ab039d2f5801632b5dfff9bff38ae7..ba7ea323a6f2cbddd1f3cfafbc1cd93722fc6019 100644 (file)
@@ -1,6 +1,6 @@
 #include "redis.h"
-#include <pthread.h>
 #include <math.h>
+#include <ctype.h>
 
 robj *createObject(int type, void *ptr) {
     robj *o = zmalloc(sizeof(*o));
@@ -9,19 +9,8 @@ robj *createObject(int type, void *ptr) {
     o->ptr = ptr;
     o->refcount = 1;
 
-    /* Set the LRU to the current lruclock (minutes resolution).
-     * We do this regardless of the fact VM is active as LRU is also
-     * used for the maxmemory directive when Redis is used as cache.
-     *
-     * Note that this code may run in the context of an I/O thread
-     * and accessing server.lruclock in theory is an error
-     * (no locks). But in practice this is safe, and even if we read
-     * garbage Redis will not fail. */
+    /* Set the LRU to the current lruclock (minutes resolution). */
     o->lru = server.lruclock;
-    /* The following is only needed if VM is active, but since the conditional
-     * is probably more costly than initializing the field it's better to
-     * have every field properly initialized anyway. */
-    o->storage = REDIS_DS_MEMORY;
     return o;
 }
 
@@ -31,8 +20,7 @@ robj *createStringObject(char *ptr, size_t len) {
 
 robj *createStringObjectFromLongLong(long long value) {
     robj *o;
-    if (value >= 0 && value < REDIS_SHARED_INTEGERS &&
-        pthread_equal(pthread_self(),server.mainthread)) {
+    if (value >= 0 && value < REDIS_SHARED_INTEGERS) {
         incrRefCount(shared.integers[value]);
         o = shared.integers[value];
     } else {
@@ -47,8 +35,32 @@ robj *createStringObjectFromLongLong(long long value) {
     return o;
 }
 
+/* Note: this function is defined into object.c since here it is where it
+ * belongs but it is actually designed to be used just for INCRBYFLOAT */
+robj *createStringObjectFromLongDouble(long double value) {
+    char buf[256];
+    int len;
+
+    /* We use 17 digits precision since with 128 bit floats that precision
+     * after rouding is able to represent most small decimal numbers in a way
+     * that is "non surprising" for the user (that is, most small decimal
+     * numbers will be represented in a way that when converted back into
+     * a string are exactly the same as what the user typed.) */
+    len = snprintf(buf,sizeof(buf),"%.17Lf", value);
+    /* Now remove trailing zeroes after the '.' */
+    if (strchr(buf,'.') != NULL) {
+        char *p = buf+len-1;
+        while(*p == '0') {
+            p--;
+            len--;
+        }
+        if (*p == '.') len--;
+    }
+    return createStringObject(buf,len);
+}
+
 robj *dupStringObject(robj *o) {
-    redisAssert(o->encoding == REDIS_ENCODING_RAW);
+    redisAssertWithInfo(NULL,o,o->encoding == REDIS_ENCODING_RAW);
     return createStringObject(o->ptr,sdslen(o->ptr));
 }
 
@@ -82,21 +94,28 @@ robj *createIntsetObject(void) {
 }
 
 robj *createHashObject(void) {
-    /* All the Hashes start as zipmaps. Will be automatically converted
-     * into hash tables if there are enough elements or big elements
-     * inside. */
-    unsigned char *zm = zipmapNew();
-    robj *o = createObject(REDIS_HASH,zm);
-    o->encoding = REDIS_ENCODING_ZIPMAP;
+    unsigned char *zl = ziplistNew();
+    robj *o = createObject(REDIS_HASH, zl);
+    o->encoding = REDIS_ENCODING_ZIPLIST;
     return o;
 }
 
 robj *createZsetObject(void) {
     zset *zs = zmalloc(sizeof(*zs));
+    robj *o;
 
     zs->dict = dictCreate(&zsetDictType,NULL);
     zs->zsl = zslCreate();
-    return createObject(REDIS_ZSET,zs);
+    o = createObject(REDIS_ZSET,zs);
+    o->encoding = REDIS_ENCODING_SKIPLIST;
+    return o;
+}
+
+robj *createZsetZiplistObject(void) {
+    unsigned char *zl = ziplistNew();
+    robj *o = createObject(REDIS_ZSET,zl);
+    o->encoding = REDIS_ENCODING_ZIPLIST;
+    return o;
 }
 
 void freeStringObject(robj *o) {
@@ -132,11 +151,20 @@ void freeSetObject(robj *o) {
 }
 
 void freeZsetObject(robj *o) {
-    zset *zs = o->ptr;
-
-    dictRelease(zs->dict);
-    zslFree(zs->zsl);
-    zfree(zs);
+    zset *zs;
+    switch (o->encoding) {
+    case REDIS_ENCODING_SKIPLIST:
+        zs = o->ptr;
+        dictRelease(zs->dict);
+        zslFree(zs->zsl);
+        zfree(zs);
+        break;
+    case REDIS_ENCODING_ZIPLIST:
+        zfree(o->ptr);
+        break;
+    default:
+        redisPanic("Unknown sorted set encoding");
+    }
 }
 
 void freeHashObject(robj *o) {
@@ -144,7 +172,7 @@ void freeHashObject(robj *o) {
     case REDIS_ENCODING_HT:
         dictRelease((dict*) o->ptr);
         break;
-    case REDIS_ENCODING_ZIPMAP:
+    case REDIS_ENCODING_ZIPLIST:
         zfree(o->ptr);
         break;
     default:
@@ -161,10 +189,7 @@ void decrRefCount(void *obj) {
     robj *o = obj;
 
     if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
-    if (--(o->refcount) == 0) {
-        /* DS_SAVING objects should always have a reference in the
-         * IO Job structure. So we should never reach this state. */
-        redisAssert(o->storage != REDIS_DS_SAVING);
+    if (o->refcount == 1) {
         switch(o->type) {
         case REDIS_STRING: freeStringObject(o); break;
         case REDIS_LIST: freeListObject(o); break;
@@ -173,11 +198,29 @@ void decrRefCount(void *obj) {
         case REDIS_HASH: freeHashObject(o); break;
         default: redisPanic("Unknown object type"); break;
         }
-        o->ptr = NULL; /* defensive programming. We'll see NULL in traces. */
         zfree(o);
+    } else {
+        o->refcount--;
     }
 }
 
+/* This function set the ref count to zero without freeing the object.
+ * It is useful in order to pass a new object to functions incrementing
+ * the ref count of the received object. Example:
+ *
+ *    functionThatWillIncrementRefCount(resetRefCount(CreateObject(...)));
+ *
+ * Otherwise you need to resort to the less elegant pattern:
+ *
+ *    *obj = createObject(...);
+ *    functionThatWillIncrementRefCount(obj);
+ *    decrRefCount(obj);
+ */
+robj *resetRefCount(robj *obj) {
+    obj->refcount = 0;
+    return obj;
+}
+
 int checkType(redisClient *c, robj *o, int type) {
     if (o->type != type) {
         addReply(c,shared.wrongtypeerr);
@@ -186,6 +229,16 @@ int checkType(redisClient *c, robj *o, int type) {
     return 0;
 }
 
+int isObjectRepresentableAsLongLong(robj *o, long long *llval) {
+    redisAssertWithInfo(NULL,o,o->type == REDIS_STRING);
+    if (o->encoding == REDIS_ENCODING_INT) {
+        if (llval) *llval = (long) o->ptr;
+        return REDIS_OK;
+    } else {
+        return string2ll(o->ptr,sdslen(o->ptr),llval) ? REDIS_OK : REDIS_ERR;
+    }
+}
+
 /* Try to encode a string object in order to save space */
 robj *tryObjectEncoding(robj *o) {
     long value;
@@ -200,24 +253,19 @@ robj *tryObjectEncoding(robj *o) {
      if (o->refcount > 1) return o;
 
     /* Currently we try to encode only strings */
-    redisAssert(o->type == REDIS_STRING);
+    redisAssertWithInfo(NULL,o,o->type == REDIS_STRING);
 
     /* Check if we can represent this string as a long integer */
-    if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return o;
+    if (!string2l(s,sdslen(s),&value)) return o;
 
     /* Ok, this object can be encoded...
      *
-     * Can I use a shared object? Only if the object is inside a given
-     * range and if the back end in use is in-memory. For disk store every
-     * object in memory used as value should be independent.
+     * Can I use a shared object? Only if the object is inside a given range
      *
      * Note that we also avoid using shared integers when maxmemory is used
      * because every object needs to have a private LRU field for the LRU
      * algorithm to work well. */
-    if (server.ds_enabled == 0 &&
-        server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS &&
-        pthread_equal(pthread_self(),server.mainthread))
-    {
+    if (server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS) {
         decrRefCount(o);
         incrRefCount(shared.integers[value]);
         return shared.integers[value];
@@ -258,7 +306,7 @@ robj *getDecodedObject(robj *o) {
  * sdscmp() from sds.c will apply memcmp() so this function ca be considered
  * binary safe. */
 int compareStringObjects(robj *a, robj *b) {
-    redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
+    redisAssertWithInfo(NULL,a,a->type == REDIS_STRING && b->type == REDIS_STRING);
     char bufa[128], bufb[128], *astr, *bstr;
     int bothsds = 1;
 
@@ -293,7 +341,7 @@ int equalStringObjects(robj *a, robj *b) {
 }
 
 size_t stringObjectLen(robj *o) {
-    redisAssert(o->type == REDIS_STRING);
+    redisAssertWithInfo(NULL,o,o->type == REDIS_STRING);
     if (o->encoding == REDIS_ENCODING_RAW) {
         return sdslen(o->ptr);
     } else {
@@ -310,17 +358,19 @@ int getDoubleFromObject(robj *o, double *target) {
     if (o == NULL) {
         value = 0;
     } else {
-        redisAssert(o->type == REDIS_STRING);
+        redisAssertWithInfo(NULL,o,o->type == REDIS_STRING);
         if (o->encoding == REDIS_ENCODING_RAW) {
+            errno = 0;
             value = strtod(o->ptr, &eptr);
-            if (eptr[0] != '\0' || isnan(value)) return REDIS_ERR;
+            if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' ||
+                errno == ERANGE || isnan(value))
+                return REDIS_ERR;
         } else if (o->encoding == REDIS_ENCODING_INT) {
             value = (long)o->ptr;
         } else {
             redisPanic("Unknown string encoding");
         }
     }
-
     *target = value;
     return REDIS_OK;
 }
@@ -331,11 +381,48 @@ int getDoubleFromObjectOrReply(redisClient *c, robj *o, double *target, const ch
         if (msg != NULL) {
             addReplyError(c,(char*)msg);
         } else {
-            addReplyError(c,"value is not a double");
+            addReplyError(c,"value is not a valid float");
         }
         return REDIS_ERR;
     }
+    *target = value;
+    return REDIS_OK;
+}
+
+int getLongDoubleFromObject(robj *o, long double *target) {
+    long double value;
+    char *eptr;
+
+    if (o == NULL) {
+        value = 0;
+    } else {
+        redisAssertWithInfo(NULL,o,o->type == REDIS_STRING);
+        if (o->encoding == REDIS_ENCODING_RAW) {
+            errno = 0;
+            value = strtold(o->ptr, &eptr);
+            if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' ||
+                errno == ERANGE || isnan(value))
+                return REDIS_ERR;
+        } else if (o->encoding == REDIS_ENCODING_INT) {
+            value = (long)o->ptr;
+        } else {
+            redisPanic("Unknown string encoding");
+        }
+    }
+    *target = value;
+    return REDIS_OK;
+}
 
+int getLongDoubleFromObjectOrReply(redisClient *c, robj *o, long double *target, const char *msg) {
+    long double value;
+    if (getLongDoubleFromObject(o, &value) != REDIS_OK) {
+        if (msg != NULL) {
+            addReplyError(c,(char*)msg);
+        } else {
+            addReplyError(c,"value is not a valid float");
+        }
+        return REDIS_ERR;
+    }
     *target = value;
     return REDIS_OK;
 }
@@ -347,11 +434,12 @@ int getLongLongFromObject(robj *o, long long *target) {
     if (o == NULL) {
         value = 0;
     } else {
-        redisAssert(o->type == REDIS_STRING);
+        redisAssertWithInfo(NULL,o,o->type == REDIS_STRING);
         if (o->encoding == REDIS_ENCODING_RAW) {
+            errno = 0;
             value = strtoll(o->ptr, &eptr, 10);
-            if (eptr[0] != '\0') return REDIS_ERR;
-            if (errno == ERANGE && (value == LLONG_MIN || value == LLONG_MAX))
+            if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' ||
+                errno == ERANGE)
                 return REDIS_ERR;
         } else if (o->encoding == REDIS_ENCODING_INT) {
             value = (long)o->ptr;
@@ -359,7 +447,6 @@ int getLongLongFromObject(robj *o, long long *target) {
             redisPanic("Unknown string encoding");
         }
     }
-
     if (target) *target = value;
     return REDIS_OK;
 }
@@ -374,7 +461,6 @@ int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, con
         }
         return REDIS_ERR;
     }
-
     *target = value;
     return REDIS_OK;
 }
@@ -391,7 +477,6 @@ int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *
         }
         return REDIS_ERR;
     }
-
     *target = value;
     return REDIS_OK;
 }
@@ -401,10 +486,10 @@ char *strEncoding(int encoding) {
     case REDIS_ENCODING_RAW: return "raw";
     case REDIS_ENCODING_INT: return "int";
     case REDIS_ENCODING_HT: return "hashtable";
-    case REDIS_ENCODING_ZIPMAP: return "zipmap";
     case REDIS_ENCODING_LINKEDLIST: return "linkedlist";
     case REDIS_ENCODING_ZIPLIST: return "ziplist";
     case REDIS_ENCODING_INTSET: return "intset";
+    case REDIS_ENCODING_SKIPLIST: return "skiplist";
     default: return "unknown";
     }
 }
@@ -419,3 +504,42 @@ unsigned long estimateObjectIdleTime(robj *o) {
                     REDIS_LRU_CLOCK_RESOLUTION;
     }
 }
+
+/* This is an helper function for the DEBUG command. We need to lookup keys
+ * without any modification of LRU or other parameters. */
+robj *objectCommandLookup(redisClient *c, robj *key) {
+    dictEntry *de;
+
+    if ((de = dictFind(c->db->dict,key->ptr)) == NULL) return NULL;
+    return (robj*) dictGetVal(de);
+}
+
+robj *objectCommandLookupOrReply(redisClient *c, robj *key, robj *reply) {
+    robj *o = objectCommandLookup(c,key);
+
+    if (!o) addReply(c, reply);
+    return o;
+}
+
+/* Object command allows to inspect the internals of an Redis Object.
+ * Usage: OBJECT <verb> ... arguments ... */
+void objectCommand(redisClient *c) {
+    robj *o;
+
+    if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
+        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
+                == NULL) return;
+        addReplyLongLong(c,o->refcount);
+    } else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
+        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
+                == NULL) return;
+        addReplyBulkCString(c,strEncoding(o->encoding));
+    } else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
+        if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
+                == NULL) return;
+        addReplyLongLong(c,estimateObjectIdleTime(o));
+    } else {
+        addReplyError(c,"Syntax error. Try OBJECT (refcount|encoding|idletime)");
+    }
+}
+