int port;
int fd;
redisDb *db;
- dict *sharingpool; /* Poll used for object sharing */
- unsigned int sharingpoolsize;
long long dirty; /* changes to DB from the last save */
list *clients;
list *slaves, *monitors;
/* Our shared "common" objects */
+#define REDIS_SHARED_INTEGERS 10000
struct sharedObjectsStruct {
robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
*colon, *nullbulk, *nullmultibulk, *queued,
*select0, *select1, *select2, *select3, *select4,
*select5, *select6, *select7, *select8, *select9,
*messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3,
- *psubscribebulk, *punsubscribebulk;
+ *psubscribebulk, *punsubscribebulk, *integers[REDIS_SHARED_INTEGERS];
} shared;
/* Global vars that are actally used as constants. The following double
static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
static int syncWithMaster(void);
-static robj *tryObjectSharing(robj *o);
-static int tryObjectEncoding(robj *o);
+static robj *tryObjectEncoding(robj *o);
static robj *getDecodedObject(robj *o);
static int removeExpire(redisDb *db, robj *key);
static int expireIfNeeded(redisDb *db, robj *key);
static void zrankCommand(redisClient *c);
static void zrevrankCommand(redisClient *c);
static void hsetCommand(redisClient *c);
+static void hmsetCommand(redisClient *c);
static void hgetCommand(redisClient *c);
static void hdelCommand(redisClient *c);
static void hlenCommand(redisClient *c);
{"zrank",zrankCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
{"zrevrank",zrevrankCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
{"hset",hsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1},
+ {"hmset",hmsetCommand,-4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1},
{"hincrby",hincrbyCommand,4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1},
{"hget",hgetCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
{"hdel",hdelCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
redisLog(REDIS_WARNING, "Background saving error");
} else {
redisLog(REDIS_WARNING,
- "Background saving terminated by signal");
+ "Background saving terminated by signal %d", WTERMSIG(statloc));
rdbRemoveTempFile(server.bgsavechildpid);
}
server.bgsavechildpid = -1;
redisLog(REDIS_WARNING, "Background append only file rewriting error");
} else {
redisLog(REDIS_WARNING,
- "Background append only file rewriting terminated by signal");
+ "Background append only file rewriting terminated by signal %d",
+ WTERMSIG(statloc));
}
cleanup:
sdsfree(server.bgrewritebuf);
server.bgrewritechildpid = -1;
}
+/* This function is called once a background process of some kind terminates,
+ * as we want to avoid resizing the hash tables when there is a child in order
+ * to play well with copy-on-write (otherwise when a resize happens lots of
+ * memory pages are copied). The goal of this function is to update the ability
+ * for dict.c to resize the hash tables accordingly to the fact we have o not
+ * running childs. */
+static void updateDictResizePolicy(void) {
+ if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
+ dictEnableResize();
+ else
+ dictDisableResize();
+}
+
static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j, loops = server.cronloops++;
REDIS_NOTUSED(eventLoop);
* if we resize the HT while there is the saving child at work actually
* a lot of memory movements in the parent will cause a lot of pages
* copied. */
- if (server.bgsavechildpid == -1 && !(loops % 10)) tryResizeHashTables();
+ if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1 &&
+ !(loops % 10))
+ {
+ tryResizeHashTables();
+ }
/* Show information about connected clients */
if (!(loops % 50)) {
- redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
+ redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
- zmalloc_used_memory(),
- dictSize(server.sharingpool));
+ zmalloc_used_memory());
}
/* Close connections of timedout clients */
} else {
backgroundRewriteDoneHandler(statloc);
}
+ updateDictResizePolicy();
}
} else {
/* If there is not a background saving in progress check if
}
static void createSharedObjects(void) {
+ int j;
+
shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
shared.mbulk3 = createStringObject("*3\r\n",4);
+ for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
+ shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
+ shared.integers[j]->encoding = REDIS_ENCODING_INT;
+ }
}
static void appendServerSaveParams(time_t seconds, int changes) {
server.requirepass = NULL;
server.shareobjects = 0;
server.rdbcompression = 1;
- server.sharingpoolsize = 1024;
server.maxclients = 0;
server.blpop_blocked_clients = 0;
server.maxmemory = 0;
createSharedObjects();
server.el = aeCreateEventLoop();
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
- server.sharingpool = dictCreate(&setDictType,NULL);
server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
if (server.fd == -1) {
redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
- } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
- server.sharingpoolsize = atoi(argv[1]);
- if (server.sharingpoolsize < 1) {
- err = "invalid object sharing pool size"; goto loaderr;
- }
} else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
if ((server.daemonize = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
return 1;
}
}
- /* Let's try to share objects on the command arguments vector */
- if (server.shareobjects) {
- int j;
- for(j = 1; j < c->argc; j++)
- c->argv[j] = tryObjectSharing(c->argv[j]);
- }
/* Let's try to encode the bulk object to save space. */
if (cmd->flags & REDIS_CMD_BULK)
- tryObjectEncoding(c->argv[c->argc-1]);
+ c->argv[c->argc-1] = tryObjectEncoding(c->argv[c->argc-1]);
/* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
addReplySds(c,sdsnewlen(buf,len));
}
+static void addReplyLongLong(redisClient *c, long long ll) {
+ char buf[128];
+ size_t len;
+
+ if (ll == 0) {
+ addReply(c,shared.czero);
+ return;
+ } else if (ll == 1) {
+ addReply(c,shared.cone);
+ return;
+ }
+ len = snprintf(buf,sizeof(buf),":%lld\r\n",ll);
+ addReplySds(c,sdsnewlen(buf,len));
+}
+
static void addReplyUlong(redisClient *c, unsigned long ul) {
char buf[128];
size_t len;
}
static void incrRefCount(robj *o) {
- redisAssert(!server.vm_enabled || o->storage == REDIS_VM_MEMORY);
o->refcount++;
}
if (server.vm_enabled &&
(o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING))
{
- if (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING) {
- redisAssert(o->refcount == 1);
- }
if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(obj);
redisAssert(o->type == REDIS_STRING);
freeStringObject(o);
return retval == DICT_OK;
}
-/* Try to share an object against the shared objects pool */
-static robj *tryObjectSharing(robj *o) {
- struct dictEntry *de;
- unsigned long c;
-
- if (o == NULL || server.shareobjects == 0) return o;
-
- redisAssert(o->type == REDIS_STRING);
- de = dictFind(server.sharingpool,o);
- if (de) {
- robj *shared = dictGetEntryKey(de);
-
- c = ((unsigned long) dictGetEntryVal(de))+1;
- dictGetEntryVal(de) = (void*) c;
- incrRefCount(shared);
- decrRefCount(o);
- return shared;
- } else {
- /* Here we are using a stream algorihtm: Every time an object is
- * shared we increment its count, everytime there is a miss we
- * recrement the counter of a random object. If this object reaches
- * zero we remove the object and put the current object instead. */
- if (dictSize(server.sharingpool) >=
- server.sharingpoolsize) {
- de = dictGetRandomKey(server.sharingpool);
- redisAssert(de != NULL);
- c = ((unsigned long) dictGetEntryVal(de))-1;
- dictGetEntryVal(de) = (void*) c;
- if (c == 0) {
- dictDelete(server.sharingpool,de->key);
- }
- } else {
- c = 0; /* If the pool is empty we want to add this object */
- }
- if (c == 0) {
- int retval;
-
- retval = dictAdd(server.sharingpool,o,(void*)1);
- redisAssert(retval == DICT_OK);
- incrRefCount(o);
- }
- return o;
- }
-}
-
/* Check if the nul-terminated string 's' can be represented by a long
* (that is, is a number that fits into long without any other space or
* character before or after the digits).
}
/* Try to encode a string object in order to save space */
-static int tryObjectEncoding(robj *o) {
+static robj *tryObjectEncoding(robj *o) {
long value;
sds s = o->ptr;
if (o->encoding != REDIS_ENCODING_RAW)
- return REDIS_ERR; /* Already encoded */
+ return o; /* Already encoded */
- /* It's not save to encode shared objects: shared objects can be shared
+ /* It's not safe to encode shared objects: shared objects can be shared
* everywhere in the "object space" of Redis. Encoded objects can only
* appear as "values" (and not, for instance, as keys) */
- if (o->refcount > 1) return REDIS_ERR;
+ if (o->refcount > 1) return o;
/* Currently we try to encode only strings */
redisAssert(o->type == REDIS_STRING);
/* Check if we can represent this string as a long integer */
- if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
+ if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return o;
/* Ok, this object can be encoded */
- o->encoding = REDIS_ENCODING_INT;
- sdsfree(o->ptr);
- o->ptr = (void*) value;
- return REDIS_OK;
+ if (value >= 0 && value < REDIS_SHARED_INTEGERS) {
+ decrRefCount(o);
+ incrRefCount(shared.integers[value]);
+ return shared.integers[value];
+ } else {
+ o->encoding = REDIS_ENCODING_INT;
+ sdsfree(o->ptr);
+ o->ptr = (void*) value;
+ return o;
+ }
}
/* Get a decoded version of an encoded object (returned as a new object).
}
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
server.bgsavechildpid = childpid;
+ updateDictResizePolicy();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
case REDIS_RDB_ENC_INT8:
case REDIS_RDB_ENC_INT16:
case REDIS_RDB_ENC_INT32:
- return tryObjectSharing(rdbLoadIntegerObject(fp,len));
+ return rdbLoadIntegerObject(fp,len);
case REDIS_RDB_ENC_LZF:
- return tryObjectSharing(rdbLoadLzfStringObject(fp));
+ return rdbLoadLzfStringObject(fp);
default:
redisAssert(0);
}
sdsfree(val);
return NULL;
}
- return tryObjectSharing(createObject(REDIS_STRING,val));
+ return createObject(REDIS_STRING,val);
}
/* For information about double serialization check rdbSaveDoubleValue() */
if (type == REDIS_STRING) {
/* Read string value */
if ((o = rdbLoadStringObject(fp)) == NULL) return NULL;
- tryObjectEncoding(o);
+ o = tryObjectEncoding(o);
} else if (type == REDIS_LIST || type == REDIS_SET) {
/* Read list/set value */
uint32_t listlen;
robj *ele;
if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
- tryObjectEncoding(ele);
+ ele = tryObjectEncoding(ele);
if (type == REDIS_LIST) {
listAddNodeTail((list*)o->ptr,ele);
} else {
double *score = zmalloc(sizeof(double));
if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
- tryObjectEncoding(ele);
+ ele = tryObjectEncoding(ele);
if (rdbLoadDoubleValue(fp,score) == -1) return NULL;
dictAdd(zs->dict,ele,score);
zslInsert(zs->zsl,*score,ele);
decrRefCount(key);
decrRefCount(val);
} else {
- tryObjectEncoding(key);
- tryObjectEncoding(val);
+ key = tryObjectEncoding(key);
+ val = tryObjectEncoding(val);
dictAdd((dict*)o->ptr,key,val);
}
}
for (j = 1; j < c->argc; j += 2) {
int retval;
- tryObjectEncoding(c->argv[j+1]);
+ c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
if (retval == DICT_ERR) {
dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
value += incr;
o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
- tryObjectEncoding(o);
+ o = tryObjectEncoding(o);
retval = dictAdd(c->db->dict,c->argv[1],o);
if (retval == DICT_ERR) {
dictReplace(c->db->dict,c->argv[1],o);
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
- return level;
+ return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
static void zslInsert(zskiplist *zsl, double score, robj *obj) {
if (zipmapLen(zm) > server.hash_max_zipmap_entries)
convertToRealHash(o);
} else {
- tryObjectEncoding(c->argv[2]);
+ c->argv[2] = tryObjectEncoding(c->argv[2]);
/* note that c->argv[3] is already encoded, as the latest arg
* of a bulk command is always integer encoded if possible. */
if (dictReplace(o->ptr,c->argv[2],c->argv[3])) {
addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",update == 0));
}
+static void hmsetCommand(redisClient *c) {
+ int i;
+ robj *o, *key, *val;
+
+ if ((c->argc % 2) == 1) {
+ addReplySds(c,sdsnew("-ERR wrong number of arguments for HMSET\r\n"));
+ return;
+ }
+
+ if ((o = lookupKeyWrite(c->db,c->argv[1])) == NULL) {
+ o = createHashObject();
+ dictAdd(c->db->dict,c->argv[1],o);
+ incrRefCount(c->argv[1]);
+ } else {
+ if (o->type != REDIS_HASH) {
+ addReply(c,shared.wrongtypeerr);
+ return;
+ }
+ }
+
+ /* We want to convert the zipmap into an hash table right now if the
+ * entry to be added is too big. */
+ if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+ for (i = 2; i < c->argc; i+=2) {
+ if ((c->argv[i]->encoding == REDIS_ENCODING_RAW &&
+ sdslen(c->argv[i]->ptr) > server.hash_max_zipmap_value) ||
+ (c->argv[i+1]->encoding == REDIS_ENCODING_RAW &&
+ sdslen(c->argv[i+1]->ptr) > server.hash_max_zipmap_value)) {
+ convertToRealHash(o);
+ break;
+ }
+ }
+ }
+
+ if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+ unsigned char *zm = o->ptr;
+
+ for (i = 2; i < c->argc; i+=2) {
+ key = getDecodedObject(c->argv[i]);
+ val = getDecodedObject(c->argv[i+1]);
+ zm = zipmapSet(zm,key->ptr,sdslen(key->ptr),
+ val->ptr,sdslen(val->ptr),NULL);
+ decrRefCount(key);
+ decrRefCount(val);
+ o->ptr = zm;
+ }
+
+ /* And here there is the second check for hash conversion. */
+ if (zipmapLen(zm) > server.hash_max_zipmap_entries)
+ convertToRealHash(o);
+ } else {
+ for (i = 2; i < c->argc; i+=2) {
+ key = tryObjectEncoding(c->argv[i]);
+ val = tryObjectEncoding(c->argv[i+1]);
+ if (dictReplace(o->ptr,key,val)) {
+ incrRefCount(key);
+ }
+ incrRefCount(val);
+ }
+ }
+
+ addReply(c, shared.ok);
+}
+
static void hincrbyCommand(redisClient *c) {
long long value = 0, incr = 0;
robj *o = lookupKeyWrite(c->db,c->argv[1]);
value += incr;
hval = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
- tryObjectEncoding(hval);
+ hval = tryObjectEncoding(hval);
if (dictReplace(o->ptr,c->argv[2],hval)) {
incrRefCount(c->argv[2]);
}
}
server.dirty++;
- addReplyLong(c, value);
+ addReplyLongLong(c, value);
}
static void hgetCommand(redisClient *c) {
keyobj = createStringObject((char*)key,klen);
valobj = createStringObject((char*)val,vlen);
- tryObjectEncoding(keyobj);
- tryObjectEncoding(valobj);
+ keyobj = tryObjectEncoding(keyobj);
+ valobj = tryObjectEncoding(valobj);
dictAdd(dict,keyobj,valobj);
}
o->encoding = REDIS_ENCODING_HT;
redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
exit(1);
}
- /* Try object sharing and encoding */
- if (server.shareobjects) {
- int j;
- for(j = 1; j < argc; j++)
- argv[j] = tryObjectSharing(argv[j]);
- }
+ /* Try object encoding */
if (cmd->flags & REDIS_CMD_BULK)
- tryObjectEncoding(argv[argc-1]);
+ argv[argc-1] = tryObjectEncoding(argv[argc-1]);
/* Run the command in the context of a fake client */
fakeClient->argc = argc;
fakeClient->argv = argv;
redisLog(REDIS_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
server.bgrewritechildpid = childpid;
+ updateDictResizePolicy();
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.bgrewritebuf will start
j->type == REDIS_IOJOB_DO_SWAP ||
j->type == REDIS_IOJOB_LOAD) && j->val != NULL)
decrRefCount(j->val);
- decrRefCount(j->key);
+ /* We don't decrRefCount the j->key field as we did't incremented
+ * the count creating IO Jobs. This is because the key field here is
+ * just used as an indentifier and if a key is removed the Job should
+ * never be touched again. */
zfree(j);
}
iojob *job = ln->value;
if (job->canceled) continue; /* Skip this, already canceled. */
- if (compareStringObjects(job->key,o) == 0) {
+ if (job->key == o) {
redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n",
(void*)job, (char*)o->ptr, job->type, i);
/* Mark the pages as free since the swap didn't happened
j = zmalloc(sizeof(*j));
j->type = REDIS_IOJOB_PREPARE_SWAP;
j->db = db;
- j->key = dupStringObject(key);
+ j->key = key;
j->val = val;
incrRefCount(val);
j->canceled = 0;
j = zmalloc(sizeof(*j));
j->type = REDIS_IOJOB_LOAD;
j->db = c->db;
- j->key = dupStringObject(key);
+ j->key = o;
j->key->vtype = o->vtype;
j->page = o->vm.page;
j->val = NULL;
(void*)key, key->refcount, (unsigned long long) key->vm.page,
(unsigned long long) key->vm.usedpages));
}
+ } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) {
+ lookupKeyRead(c->db,c->argv[2]);
+ addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) {
dictEntry *de = dictFind(c->db->dict,c->argv[2]);
robj *key, *val;
}
} else {
addReplySds(c,sdsnew(
- "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPOUT <key>|RELOAD]\r\n"));
+ "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPIN <key>|SWAPOUT <key>|RELOAD]\r\n"));
}
}