From 10c43610dee70032db40008996790bdeb54632b6 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Mar 2009 21:00:48 +0100 Subject: [PATCH] Nasty bug of the new DB format fixed, objects sharing implemented --- TODO | 1 + redis.c | 119 +++++++++++++++++++++++++++++++++++++++-------------- redis.conf | 6 +++ 3 files changed, 95 insertions(+), 31 deletions(-) diff --git a/TODO b/TODO index b1e8919c..bb137a8d 100644 --- a/TODO +++ b/TODO @@ -8,5 +8,6 @@ - check 'server.dirty' everywere - replication automated tests - a command, or an external tool, to perform the MD5SUM of the whole dataset, so that if the dataset between two servers is identical, so will be the MD5SUM +- objects sharing, "objectsharing yes", "objectsharingpool 1024" * Include Lua and Perl bindings diff --git a/redis.c b/redis.c index e967d1f7..65579c95 100644 --- a/redis.c +++ b/redis.c @@ -95,10 +95,10 @@ * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow - * 11|000000 [64 bit integer] => if it's 11, a full 64 bit len will follow + * 11|000000 reserved for future uses * - * 64 bit lengths are not used currently. Lenghts up to 63 are stored using - * a single byte, most DB keys, and may values, will fit inside. */ + * Lenghts up to 63 are stored using a single byte, most DB keys, and may + * values, will fit inside. */ #define REDIS_RDB_6BITLEN 0 #define REDIS_RDB_14BITLEN 1 #define REDIS_RDB_32BITLEN 2 @@ -173,6 +173,8 @@ struct redisServer { int port; int fd; dict **dict; + dict *sharingpool; + unsigned int sharingpoolsize; long long dirty; /* changes to DB from the last save */ list *clients; list *slaves, *monitors; @@ -199,6 +201,7 @@ struct redisServer { char *logfile; char *bindaddr; char *dbfilename; + int shareobjects; /* Replication related */ int isslave; char *masterhost; @@ -258,6 +261,7 @@ static int rdbSaveBackground(char *filename); static robj *createStringObject(char *ptr, size_t len); static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc); static int syncWithMaster(void); +static robj *tryObjectSharing(robj *o); static void pingCommand(redisClient *c); static void echoCommand(redisClient *c); @@ -627,7 +631,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %d bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), - server.usedmemory); + server.usedmemory, + dictGetHashTableUsed(server.sharingpool)); } /* Close connections of timedout clients */ @@ -739,6 +744,7 @@ static void initServerConfig() { server.daemonize = 0; server.pidfile = "/var/run/redis.pid"; server.dbfilename = "dump.rdb"; + server.shareobjects = 0; ResetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -765,6 +771,8 @@ static void initServer() { createSharedObjects(); server.el = aeCreateEventLoop(); server.dict = zmalloc(sizeof(dict*)*server.dbnum); + server.sharingpool = dictCreate(&setDictType,NULL); + server.sharingpoolsize = 1024; if (!server.dict || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist) oom("server initialization"); /* Fatal OOM */ server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); @@ -772,11 +780,8 @@ static void initServer() { redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr); exit(1); } - for (j = 0; j < server.dbnum; j++) { + for (j = 0; j < server.dbnum; j++) server.dict[j] = dictCreate(&hashDictType,NULL); - if (!server.dict[j]) - oom("dictCreate"); /* Fatal OOM */ - } server.cronloops = 0; server.bgsaveinprogress = 0; server.lastsave = time(NULL); @@ -895,6 +900,13 @@ static void loadServerConfig(char *filename) { else { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcmp(argv[0],"shareobjects") && argc == 2) { + sdstolower(argv[1]); + if (!strcmp(argv[1],"yes")) server.shareobjects = 1; + else if (!strcmp(argv[1],"no")) server.shareobjects = 0; + else { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcmp(argv[0],"daemonize") && argc == 2) { sdstolower(argv[1]); if (!strcmp(argv[1],"yes")) server.daemonize = 1; @@ -1102,6 +1114,12 @@ static int processCommand(redisClient *c) { return 1; } } + /* Let's try to share objects on the command arguments vector */ + if (server.shareobjects) { + int j; + for(j = 1; j < c->argc; j++) + c->argv[j] = tryObjectSharing(c->argv[j]); + } /* Exec the command */ dirty = server.dirty; cmd->proc(c); @@ -1412,6 +1430,51 @@ static void decrRefCount(void *obj) { } } +/* Try to share an object against the shared objects pool */ +static robj *tryObjectSharing(robj *o) { + struct dictEntry *de; + unsigned long c; + + if (server.shareobjects == 0) return o; + + assert(o->type == REDIS_STRING); + de = dictFind(server.sharingpool,o); + if (de) { + robj *shared = dictGetEntryKey(de); + + c = ((unsigned long) dictGetEntryVal(de))+1; + dictGetEntryVal(de) = (void*) c; + incrRefCount(shared); + decrRefCount(o); + return shared; + } else { + /* Here we are using a stream algorihtm: Every time an object is + * shared we increment its count, everytime there is a miss we + * recrement the counter of a random object. If this object reaches + * zero we remove the object and put the current object instead. */ + if (dictGetHashTableUsed(server.sharingpool) >= + server.sharingpoolsize) { + de = dictGetRandomKey(server.sharingpool); + assert(de != NULL); + c = ((unsigned long) dictGetEntryVal(de))-1; + dictGetEntryVal(de) = (void*) c; + if (c == 0) { + dictDelete(server.sharingpool,de->key); + } + } else { + c = 0; /* If the pool is empty we want to add this object */ + } + if (c == 0) { + int retval; + + retval = dictAdd(server.sharingpool,o,(void*)1); + assert(retval == DICT_OK); + incrRefCount(o); + } + return o; + } +} + /*============================ DB saving/loading ============================ */ static int rdbSaveType(FILE *fp, unsigned char type) { @@ -1424,16 +1487,16 @@ static int rdbSaveLen(FILE *fp, uint32_t len) { if (len < (1<<6)) { /* Save a 6 bit len */ - buf[0] = (len&0xFF)|REDIS_RDB_6BITLEN; + buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6); if (fwrite(buf,1,1,fp) == 0) return -1; } else if (len < (1<<14)) { /* Save a 14 bit len */ - buf[0] = ((len>>8)&0xFF)|REDIS_RDB_14BITLEN; + buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6); buf[1] = len&0xFF; if (fwrite(buf,4,1,fp) == 0) return -1; } else { /* Save a 32 bit len */ - buf[0] = REDIS_RDB_32BITLEN; + buf[0] = (REDIS_RDB_32BITLEN<<6); if (fwrite(buf,1,1,fp) == 0) return -1; len = htonl(len); if (fwrite(&len,4,1,fp) == 0) return -1; @@ -1441,6 +1504,14 @@ static int rdbSaveLen(FILE *fp, uint32_t len) { return 0; } +static int rdbSaveStringObject(FILE *fp, robj *obj) { + size_t len = sdslen(obj->ptr); + + if (rdbSaveLen(fp,len) == -1) return -1; + if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1; + return 0; +} + /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */ static int rdbSave(char *filename) { dictIterator *di = NULL; @@ -1475,15 +1546,10 @@ static int rdbSave(char *filename) { robj *o = dictGetEntryVal(de); if (rdbSaveType(fp,o->type) == -1) goto werr; - if (rdbSaveLen(fp,sdslen(key->ptr)) == -1) goto werr; - if (fwrite(key->ptr,sdslen(key->ptr),1,fp) == 0) goto werr; + if (rdbSaveStringObject(fp,key) == -1) goto werr; if (o->type == REDIS_STRING) { /* Save a string value */ - sds sval = o->ptr; - - if (rdbSaveLen(fp,sdslen(sval)) == -1) goto werr; - if (sdslen(sval) && - fwrite(sval,sdslen(sval),1,fp) == 0) goto werr; + if (rdbSaveStringObject(fp,o) == -1) goto werr; } else if (o->type == REDIS_LIST) { /* Save a list value */ list *list = o->ptr; @@ -1493,10 +1559,7 @@ static int rdbSave(char *filename) { while(ln) { robj *eleobj = listNodeValue(ln); - if (rdbSaveLen(fp,sdslen(eleobj->ptr)) == -1) goto werr; - if (sdslen(eleobj->ptr) && - fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0) - goto werr; + if (rdbSaveStringObject(fp,eleobj) == -1) goto werr; ln = ln->next; } } else if (o->type == REDIS_SET) { @@ -1508,13 +1571,9 @@ static int rdbSave(char *filename) { if (!set) oom("dictGetIteraotr"); if (rdbSaveLen(fp,dictGetHashTableUsed(set)) == -1) goto werr; while((de = dictNext(di)) != NULL) { - robj *eleobj; + robj *eleobj = dictGetEntryKey(de); - eleobj = dictGetEntryKey(de); - if (rdbSaveLen(fp,sdslen(eleobj->ptr)) == -1) goto werr; - if (sdslen(eleobj->ptr) && - fwrite(eleobj->ptr,sdslen(eleobj->ptr),1,fp) == 0) - goto werr; + if (rdbSaveStringObject(fp,eleobj) == -1) goto werr; } dictReleaseIterator(di); } else { @@ -1600,7 +1659,6 @@ static uint32_t rdbLoadLen(FILE *fp, int rdbver) { return ntohl(len); } } - return 0; } static robj *rdbLoadStringObject(FILE*fp,int rdbver) { @@ -1613,7 +1671,7 @@ static robj *rdbLoadStringObject(FILE*fp,int rdbver) { sdsfree(val); return NULL; } - return createObject(REDIS_STRING,val); + return tryObjectSharing(createObject(REDIS_STRING,val)); } static int rdbLoad(char *filename) { @@ -1625,7 +1683,6 @@ static int rdbLoad(char *filename) { dict *d = server.dict[0]; char buf[1024]; int rdbver; - fp = fopen(filename,"r"); if (!fp) return REDIS_ERR; if (fread(buf,9,1,fp) == 0) goto eoferr; diff --git a/redis.conf b/redis.conf index 3727efd5..803e3ec1 100644 --- a/redis.conf +++ b/redis.conf @@ -68,3 +68,9 @@ databases 16 # single TCP packet. Uses a bit more CPU but most of the times it is a win # in terms of number of queries per second. Use 'yes' if unsure. glueoutputbuf yes + +# Use object sharing. Can save a lot of memory if you have many common +# string in your dataset, but performs lookups against the shared objects +# pool so it uses more CPU and can be a bit slower. Usually it's a good +# idea. +shareobjects no -- 2.45.2