From c9d0c3623a7714bd41a35237f4ba927206a7adb6 Mon Sep 17 00:00:00 2001 From: antirez Date: Sat, 25 Jun 2011 12:22:03 +0200 Subject: [PATCH] diskstore removed --- redis.conf | 26 - src/Makefile | 2 +- src/aof.c | 4 - src/config.c | 12 - src/db.c | 82 +-- src/debug.c | 20 - src/diskstore.c | 509 ------------------ src/dscache.c | 1028 ------------------------------------- src/networking.c | 23 - src/object.c | 10 +- src/rdb.c | 19 +- src/redis.c | 104 +--- src/redis.h | 91 +--- tests/assets/default.conf | 26 - 14 files changed, 12 insertions(+), 1944 deletions(-) delete mode 100644 src/diskstore.c delete mode 100644 src/dscache.c diff --git a/redis.conf b/redis.conf index 87d34eef..6d18e2f5 100644 --- a/redis.conf +++ b/redis.conf @@ -312,32 +312,6 @@ no-appendfsync-on-rewrite no auto-aof-rewrite-percentage 100 auto-aof-rewrite-min-size 64mb -#################################### DISK STORE ############################### - -# When disk store is active Redis works as an on-disk database, where memory -# is only used as a object cache. -# -# This mode is good for datasets that are bigger than memory, and in general -# when you want to trade speed for: -# -# - less memory used -# - immediate server restart -# - per key durability, without need for backgrond savig -# -# On the other hand, with disk store enabled MULTI/EXEC are no longer -# transactional from the point of view of the persistence on disk, that is, -# Redis transactions will still guarantee that commands are either processed -# all or nothing, but there is no guarantee that all the keys are flushed -# on disk in an atomic way. -# -# Of course with disk store enabled Redis is not as fast as it is when -# working with just the memory back end. - -diskstore-enabled no -diskstore-path redis.ds -cache-max-memory 0 -cache-flush-delay 0 - ############################### ADVANCED CONFIG ############################### # Hashes are encoded in a special way (much more memory efficient) when they diff --git a/src/Makefile b/src/Makefile index 292255cd..f499a194 100644 --- a/src/Makefile +++ b/src/Makefile @@ -61,7 +61,7 @@ QUIET_CC = @printf ' %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR); endif -OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o dscache.o pubsub.o multi.o debug.o sort.o intset.o syncio.o diskstore.o cluster.o crc16.o endian.o +OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endian.o BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o diff --git a/src/aof.c b/src/aof.c index dbd0468d..ac6b9791 100644 --- a/src/aof.c +++ b/src/aof.c @@ -574,10 +574,6 @@ int rewriteAppendOnlyFileBackground(void) { long long start; if (server.bgrewritechildpid != -1) return REDIS_ERR; - if (server.ds_enabled != 0) { - redisLog(REDIS_WARNING,"BGREWRITEAOF called with diskstore enabled: AOF is not supported when diskstore is enabled. Operation not performed."); - return REDIS_ERR; - } start = ustime(); if ((childpid = fork()) == 0) { char tmpfile[256]; diff --git a/src/config.c b/src/config.c index 9d30d985..e82fbde4 100644 --- a/src/config.c +++ b/src/config.c @@ -251,18 +251,6 @@ void loadServerConfig(char *filename) { } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) { zfree(server.dbfilename); server.dbfilename = zstrdup(argv[1]); - } else if (!strcasecmp(argv[0],"diskstore-enabled") && argc == 2) { - if ((server.ds_enabled = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"diskstore-path") && argc == 2) { - sdsfree(server.ds_path); - server.ds_path = sdsnew(argv[1]); - } else if (!strcasecmp(argv[0],"cache-max-memory") && argc == 2) { - server.cache_max_memory = memtoll(argv[1],NULL); - } else if (!strcasecmp(argv[0],"cache-flush-delay") && argc == 2) { - server.cache_flush_delay = atoi(argv[1]); - if (server.cache_flush_delay < 0) server.cache_flush_delay = 0; } else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2) { server.hash_max_zipmap_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2) { diff --git a/src/db.c b/src/db.c index af237a0a..a02f3043 100644 --- a/src/db.c +++ b/src/db.c @@ -31,13 +31,6 @@ void SlotToKeyDel(robj *key); * the disk object. If it is in this state, we wait. */ -void lookupWaitBusyKey(redisDb *db, robj *key) { - /* FIXME: wait just for this key, not everything */ - waitEmptyIOJobsQueue(); - processAllPendingIOJobs(); - redisAssert((cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG) == 0); -} - robj *lookupKey(redisDb *db, robj *key) { dictEntry *de = dictFind(db->dict,key->ptr); if (de) { @@ -48,52 +41,9 @@ robj *lookupKey(redisDb *db, robj *key) { * a copy on write madness. */ if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) val->lru = server.lruclock; - - if (server.ds_enabled && - cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG) - { - /* Need to wait for the key to get unbusy */ - redisLog(REDIS_DEBUG,"Lookup found a key in SAVEINPROG state. Waiting. (Key was in the cache)"); - lookupWaitBusyKey(db,key); - } server.stat_keyspace_hits++; return val; } else { - time_t expire; - robj *val; - - /* Key not found in the in memory hash table, but if disk store is - * enabled we may have this key on disk. If so load it in memory - * in a blocking way. */ - if (server.ds_enabled && cacheKeyMayExist(db,key)) { - long flags = cacheScheduleIOGetFlags(db,key); - - /* They key is not in cache, but it has a SAVE op in queue? - * The only possibility is that the key was deleted, since - * dirty keys are not evicted. */ - if (flags & REDIS_IO_SAVE) { - server.stat_keyspace_misses++; - return NULL; - } - - /* At this point we need to blocking load the key in memory. - * The first thing we do is waiting here if the key is busy. */ - if (flags & REDIS_IO_SAVEINPROG) { - redisLog(REDIS_DEBUG,"Lookup found a key in SAVEINPROG state. Waiting (while force loading)."); - lookupWaitBusyKey(db,key); - } - - redisLog(REDIS_DEBUG,"Force loading key %s via lookup", key->ptr); - val = dsGet(db,key,&expire); - if (val) { - dbAdd(db,key,val); - if (expire != -1) setExpire(db,key,expire); - server.stat_keyspace_hits++; - return val; - } else { - cacheSetKeyDoesNotExist(db,key); - } - } server.stat_keyspace_misses++; return NULL; } @@ -130,7 +80,6 @@ void dbAdd(redisDb *db, robj *key, robj *val) { int retval = dictAdd(db->dict, copy, val); redisAssert(retval == REDIS_OK); - if (server.ds_enabled) cacheSetKeyMayExist(db,key); if (server.cluster_enabled) SlotToKeyAdd(key); } @@ -144,7 +93,6 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { redisAssert(de != NULL); dictReplace(db->dict, key->ptr, val); - if (server.ds_enabled) cacheSetKeyMayExist(db,key); } /* High level Set operation. This function can be used in order to set @@ -196,14 +144,6 @@ robj *dbRandomKey(redisDb *db) { /* Delete a key, value, and associated expiration entry if any, from the DB */ int dbDelete(redisDb *db, robj *key) { - /* If diskstore is enabled make sure to awake waiting clients for this key - * as it is not really useful to wait for a key already deleted to be - * loaded from disk. */ - if (server.ds_enabled) { - handleClientsBlockedOnSwappedKey(db,key); - cacheSetKeyDoesNotExist(db,key); - } - /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); @@ -225,7 +165,6 @@ long long emptyDb() { removed += dictSize(server.db[j].dict); dictEmpty(server.db[j].dict); dictEmpty(server.db[j].expires); - if (server.ds_enabled) dictEmpty(server.db[j].io_negcache); } return removed; } @@ -248,8 +187,6 @@ int selectDb(redisClient *c, int id) { void signalModifiedKey(redisDb *db, robj *key) { touchWatchedKey(db,key); - if (server.ds_enabled) - cacheScheduleIO(db,key,REDIS_IO_SAVE); } void signalFlushedDb(int dbid) { @@ -265,7 +202,6 @@ void flushdbCommand(redisClient *c) { signalFlushedDb(c->db->id); dictEmpty(c->db->dict); dictEmpty(c->db->expires); - if (server.ds_enabled) dsFlushDb(c->db->id); addReply(c,shared.ok); } @@ -277,10 +213,7 @@ void flushallCommand(redisClient *c) { kill(server.bgsavechildpid,SIGKILL); rdbRemoveTempFile(server.bgsavechildpid); } - if (server.ds_enabled) - dsFlushDb(-1); - else - rdbSave(server.dbfilename); + rdbSave(server.dbfilename); server.dirty++; } @@ -288,22 +221,10 @@ void delCommand(redisClient *c) { int deleted = 0, j; for (j = 1; j < c->argc; j++) { - if (server.ds_enabled) { - lookupKeyRead(c->db,c->argv[j]); - /* FIXME: this can be optimized a lot, no real need to load - * a possibly huge value. */ - } if (dbDelete(c->db,c->argv[j])) { signalModifiedKey(c->db,c->argv[j]); server.dirty++; deleted++; - } else if (server.ds_enabled) { - if (cacheKeyMayExist(c->db,c->argv[j]) && - dsExists(c->db,c->argv[j])) - { - cacheScheduleIO(c->db,c->argv[j],REDIS_IO_SAVE); - deleted = 1; - } } } addReplyLongLong(c,deleted); @@ -618,7 +539,6 @@ void expireatCommand(redisClient *c) { void ttlCommand(redisClient *c) { time_t expire, ttl = -1; - if (server.ds_enabled) lookupKeyRead(c->db,c->argv[1]); expire = getExpire(c->db,c->argv[1]); if (expire != -1) { ttl = (expire-time(NULL)); diff --git a/src/debug.c b/src/debug.c index 170bffa5..511512dd 100644 --- a/src/debug.c +++ b/src/debug.c @@ -212,26 +212,7 @@ void computeDatasetDigest(unsigned char *final) { void debugCommand(redisClient *c) { if (!strcasecmp(c->argv[1]->ptr,"segfault")) { *((char*)-1) = 'x'; - } else if (!strcasecmp(c->argv[1]->ptr,"flushcache")) { - if (!server.ds_enabled) { - addReplyError(c, "DEBUG FLUSHCACHE called with diskstore off."); - return; - } else if (server.bgsavethread != (pthread_t) -1) { - addReplyError(c, "Can't flush cache while BGSAVE is in progress."); - return; - } else { - /* To flush the whole cache we need to wait for everything to - * be flushed on disk... */ - cacheForcePointInTime(); - emptyDb(); - addReply(c,shared.ok); - return; - } } else if (!strcasecmp(c->argv[1]->ptr,"reload")) { - if (server.ds_enabled) { - addReply(c,shared.ok); - return; - } if (rdbSave(server.dbfilename) != REDIS_OK) { addReply(c,shared.err); return; @@ -256,7 +237,6 @@ void debugCommand(redisClient *c) { robj *val; char *strenc; - if (server.ds_enabled) lookupKeyRead(c->db,c->argv[2]); if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) { addReply(c,shared.nokeyerr); return; diff --git a/src/diskstore.c b/src/diskstore.c deleted file mode 100644 index 9e86364e..00000000 --- a/src/diskstore.c +++ /dev/null @@ -1,509 +0,0 @@ -/* diskstore.c implements a very simple disk backed key-value store used - * by Redis for the "disk" backend. This implementation uses the filesystem - * to store key/value pairs. Every file represents a given key. - * - * The key path is calculated using the SHA1 of the key name. For instance - * the key "foo" is stored as a file name called: - * - * /0b/ee/0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33 - * - * The couples of characters from the hex output of SHA1 are also used - * to locate two two levels of directories to store the file (as most - * filesystems are not able to handle too many files in a single dir). - * - * In the end there are 65536 final directories (256 directories inside - * every 256 top level directories), so that with 1 billion of files every - * directory will contain in the average 15258 entires, that is ok with - * most filesystems implementation. - * - * Note that since Redis supports multiple databases, the actual key name - * is: - * - * /0b/ee/_0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33 - * - * so for instance if the key is inside DB 0: - * - * /0b/ee/0_0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33 - * - * The actaul implementation of this disk store is highly dependant to the - * filesystem implementation itself. This implementation may be replaced by - * a B+TREE implementation in future implementations. - * - * Data ok every key is serialized using the same format used for .rdb - * serialization. Everything is serialized on every entry: key name, - * ttl information in case of keys with an associated expire time, and the - * serialized value itself. - * - * Because the format is the same of the .rdb files it is trivial to create - * an .rdb file starting from this format just by mean of scanning the - * directories and concatenating entries, with the sole addition of an - * .rdb header at the start and the end-of-db opcode at the end. - * - * ------------------------------------------------------------------------- - * - * Copyright (c) 2010-2011, Salvatore Sanfilippo - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of Redis nor the names of its contributors may be used - * to endorse or promote products derived from this software without - * specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "redis.h" -#include "sha1.h" - -#include -#include -#include - -int create256dir(char *prefix) { - char buf[1024]; - int j; - - for (j = 0; j < 256; j++) { - snprintf(buf,sizeof(buf),"%s%02x",prefix,j); - if (mkdir(buf,0755) == -1) { - redisLog(REDIS_WARNING,"Error creating dir %s for diskstore: %s", - buf,strerror(errno)); - return REDIS_ERR; - } - } - return REDIS_OK; -} - -int dsOpen(void) { - struct stat sb; - int retval, j; - char *path = server.ds_path; - char buf[1024]; - - if ((retval = stat(path,&sb) == -1) && errno != ENOENT) { - redisLog(REDIS_WARNING, "Error opening disk store at %s: %s", - path, strerror(errno)); - return REDIS_ERR; - } - - /* Directory already in place. Assume everything is ok. */ - if (retval == 0 && S_ISDIR(sb.st_mode)) { - redisLog(REDIS_NOTICE,"Disk store %s exists", path); - return REDIS_OK; - } - - /* File exists but it's not a directory */ - if (retval == 0 && !S_ISDIR(sb.st_mode)) { - redisLog(REDIS_WARNING,"Disk store at %s is not a directory", path); - return REDIS_ERR; - } - - /* New disk store, create the directory structure now, as creating - * them in a lazy way is not a good idea, after very few insertions - * we'll need most of the 65536 directories anyway. */ - redisLog(REDIS_NOTICE,"Disk store %s does not exist: creating", path); - if (mkdir(path,0755) == -1) { - redisLog(REDIS_WARNING,"Disk store init failed creating dir %s: %s", - path, strerror(errno)); - return REDIS_ERR; - } - /* Create the top level 256 directories */ - snprintf(buf,sizeof(buf),"%s/",path); - if (create256dir(buf) == REDIS_ERR) return REDIS_ERR; - - /* For every 256 top level dir, create 256 nested dirs */ - for (j = 0; j < 256; j++) { - snprintf(buf,sizeof(buf),"%s/%02x/",path,j); - if (create256dir(buf) == REDIS_ERR) return REDIS_ERR; - } - return REDIS_OK; -} - -int dsClose(void) { - return REDIS_OK; -} - -/* Convert key into full path for this object. Dirty but hopefully - * is fast enough. Returns the length of the returned path. */ -int dsKeyToPath(redisDb *db, char *buf, robj *key) { - SHA1_CTX ctx; - unsigned char hash[20]; - char hex[40], digits[] = "0123456789abcdef"; - int j, l; - char *origbuf = buf; - - SHA1Init(&ctx); - SHA1Update(&ctx,key->ptr,sdslen(key->ptr)); - SHA1Final(hash,&ctx); - - /* Convert the hash into hex format */ - for (j = 0; j < 20; j++) { - hex[j*2] = digits[(hash[j]&0xF0)>>4]; - hex[(j*2)+1] = digits[hash[j]&0x0F]; - } - - /* Create the object path. Start with server.ds_path that's the root dir */ - l = sdslen(server.ds_path); - memcpy(buf,server.ds_path,l); - buf += l; - *buf++ = '/'; - - /* Then add xx/yy/ that is the two level directories */ - buf[0] = hex[0]; - buf[1] = hex[1]; - buf[2] = '/'; - buf[3] = hex[2]; - buf[4] = hex[3]; - buf[5] = '/'; - buf += 6; - - /* Add the database number followed by _ and finall the SHA1 hex */ - l = ll2string(buf,64,db->id); - buf += l; - buf[0] = '_'; - memcpy(buf+1,hex,40); - buf[41] = '\0'; - return (buf-origbuf)+41; -} - -int dsSet(redisDb *db, robj *key, robj *val, time_t expire) { - char buf[1024], buf2[1024]; - FILE *fp; - int retval, len; - - len = dsKeyToPath(db,buf,key); - memcpy(buf2,buf,len); - snprintf(buf2+len,sizeof(buf2)-len,"-%ld-%ld",(long)time(NULL),(long)val); - while ((fp = fopen(buf2,"w")) == NULL) { - if (errno == ENOSPC) { - redisLog(REDIS_WARNING,"Diskstore: No space left on device. Please make room and wait 30 seconds for Redis to continue."); - sleep(30); - } else { - redisLog(REDIS_WARNING,"diskstore error opening %s: %s", - buf2, strerror(errno)); - redisPanic("Unrecoverable diskstore error. Exiting."); - } - } - if ((retval = rdbSaveKeyValuePair(fp,key,val,expire,time(NULL))) == -1) - return REDIS_ERR; - fclose(fp); - if (retval == 0) { - /* Expired key. Unlink failing not critical */ - unlink(buf); - unlink(buf2); - } else { - /* Use rename for atomic updadte of value */ - if (rename(buf2,buf) == -1) { - redisLog(REDIS_WARNING,"rename(2) returned an error: %s", - strerror(errno)); - redisPanic("Unrecoverable diskstore error. Exiting."); - } - } - return REDIS_OK; -} - -robj *dsGet(redisDb *db, robj *key, time_t *expire) { - char buf[1024]; - int type; - time_t expiretime = -1; /* -1 means: no expire */ - robj *dskey; /* Key as loaded from disk. */ - robj *val; - FILE *fp; - - dsKeyToPath(db,buf,key); - fp = fopen(buf,"r"); - if (fp == NULL && errno == ENOENT) return NULL; /* No such key */ - if (fp == NULL) { - redisLog(REDIS_WARNING,"Disk store failed opening %s: %s", - buf, strerror(errno)); - goto readerr; - } - - if ((type = rdbLoadType(fp)) == -1) goto readerr; - if (type == REDIS_EXPIRETIME) { - if ((expiretime = rdbLoadTime(fp)) == -1) goto readerr; - /* We read the time so we need to read the object type again */ - if ((type = rdbLoadType(fp)) == -1) goto readerr; - } - /* Read key */ - if ((dskey = rdbLoadStringObject(fp)) == NULL) goto readerr; - /* Read value */ - if ((val = rdbLoadObject(type,fp)) == NULL) goto readerr; - fclose(fp); - - /* The key we asked, and the key returned, must be the same */ - redisAssert(equalStringObjects(key,dskey)); - - /* Check if the key already expired */ - decrRefCount(dskey); - if (expiretime != -1 && expiretime < time(NULL)) { - decrRefCount(val); - unlink(buf); /* This failing is non critical here */ - return NULL; - } - - /* Everything ok... */ - *expire = expiretime; - return val; - -readerr: - redisLog(REDIS_WARNING,"Read error reading reading %s. Corrupted key?", - buf); - redisPanic("Unrecoverable error reading from disk store"); - return NULL; /* unreached */ -} - -int dsDel(redisDb *db, robj *key) { - char buf[1024]; - - dsKeyToPath(db,buf,key); - if (unlink(buf) == -1) { - if (errno == ENOENT) { - return REDIS_ERR; - } else { - redisLog(REDIS_WARNING,"Disk store can't remove %s: %s", - buf, strerror(errno)); - redisPanic("Unrecoverable Disk store errore. Existing."); - return REDIS_ERR; /* unreached */ - } - } else { - return REDIS_OK; - } -} - -int dsExists(redisDb *db, robj *key) { - char buf[1024]; - - dsKeyToPath(db,buf,key); - return access(buf,R_OK) == 0; -} - -int dsGetDbidFromFilename(char *path) { - char id[64]; - char *p = strchr(path,'_'); - int len = (p - path); - - redisAssert(p != NULL && len < 64); - memcpy(id,path,len); - id[len] = '\0'; - return atoi(id); -} - -void dsFlushOneDir(char *path, int dbid) { - DIR *dir; - struct dirent *dp, de; - - dir = opendir(path); - if (dir == NULL) { - redisLog(REDIS_WARNING,"Disk store can't open dir %s: %s", - path, strerror(errno)); - redisPanic("Unrecoverable Disk store errore. Existing."); - } - while(1) { - char buf[1024]; - - readdir_r(dir,&de,&dp); - if (dp == NULL) break; - if (dp->d_name[0] == '.') continue; - - /* Check if we need to remove this entry accordingly to the - * DB number. */ - if (dbid != -1 && dsGetDbidFromFilename(dp->d_name)) continue; - - /* Finally unlink the file */ - snprintf(buf,1024,"%s/%s",path,dp->d_name); - if (unlink(buf) == -1) { - redisLog(REDIS_WARNING, - "Can't unlink %s: %s", buf, strerror(errno)); - redisPanic("Unrecoverable Disk store errore. Existing."); - } - } - closedir(dir); -} - -void dsFlushDb(int dbid) { - char buf[1024]; - int j, i; - - redisLog(REDIS_NOTICE,"Flushing diskstore DB (%d)",dbid); - for (j = 0; j < 256; j++) { - for (i = 0; i < 256; i++) { - snprintf(buf,1024,"%s/%02x/%02x",server.ds_path,j,i); - dsFlushOneDir(buf,dbid); - } - } -} - -void dsRdbSaveSetState(int state) { - pthread_mutex_lock(&server.bgsavethread_mutex); - server.bgsavethread_state = state; - pthread_mutex_unlock(&server.bgsavethread_mutex); -} - -void *dsRdbSave_thread(void *arg) { - char tmpfile[256], *filename = (char*)arg; - struct dirent *dp, de; - int j, i, last_dbid = -1; - FILE *fp; - - /* Change state to ACTIVE, to signal there is a saving thead working. */ - redisLog(REDIS_NOTICE,"Diskstore BGSAVE thread started"); - dsRdbSaveSetState(REDIS_BGSAVE_THREAD_ACTIVE); - - snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); - fp = fopen(tmpfile,"w"); - if (!fp) { - redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s", - strerror(errno)); - dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR); - return NULL; - } - if (fwrite("REDIS0001",9,1,fp) == 0) goto werr; - - sleep(5); - - /* Scan all diskstore dirs looking for keys */ - for (j = 0; j < 256; j++) { - for (i = 0; i < 256; i++) { - DIR *dir; - char buf[1024]; - - /* For every directory, collect all the keys */ - snprintf(buf,sizeof(buf),"%s/%02x/%02x",server.ds_path,j,i); - if ((dir = opendir(buf)) == NULL) { - redisLog(REDIS_WARNING,"Disk store can't open dir %s: %s", - buf, strerror(errno)); - goto werr; - } - - while(1) { - char buf[1024]; - int dbid; - FILE *entryfp; - - readdir_r(dir,&de,&dp); - if (dp == NULL) break; - if (dp->d_name[0] == '.') continue; - /* If there is a '-' char in the file name, it's a temp file */ - if (strchr(dp->d_name,'-') != NULL) continue; - - /* Emit the SELECT DB opcode if needed. */ - dbid = dsGetDbidFromFilename(dp->d_name); - if (dbid != last_dbid) { - last_dbid = dbid; - if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr; - if (rdbSaveLen(fp,dbid) == -1) goto werr; - } - - /* Let's copy this file into the target .rdb */ - snprintf(buf,sizeof(buf),"%s/%02x/%02x/%s", - server.ds_path,j,i,dp->d_name); - if ((entryfp = fopen(buf,"r")) == NULL) { - redisLog(REDIS_WARNING,"Can't open %s: %s", - buf,strerror(errno)); - closedir(dir); - goto werr; - } - while(1) { - int nread = fread(buf,1,sizeof(buf),entryfp); - - if (nread == 0) { - if (ferror(entryfp)) { - redisLog(REDIS_WARNING,"Error reading from file entry while performing BGSAVE for diskstore: %s", strerror(errno)); - closedir(dir); - goto werr; - } else { - break; - } - } - if (fwrite(buf,1,nread,fp) != (unsigned)nread) { - closedir(dir); - goto werr; - } - } - fclose(entryfp); - } - closedir(dir); - } - } - - /* Output the end of file opcode */ - if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr; - - /* Make sure data will not remain on the OS's output buffers */ - fflush(fp); - fsync(fileno(fp)); - fclose(fp); - zfree(filename); - - /* Use RENAME to make sure the DB file is changed atomically only - * if the generate DB file is ok. */ - if (rename(tmpfile,filename) == -1) { - redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s (diskstore)", strerror(errno)); - unlink(tmpfile); - dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR); - return NULL; - } - redisLog(REDIS_NOTICE,"DB saved on disk by diskstore thread"); - dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_OK); - return NULL; - -werr: - zfree(filename); - fclose(fp); - unlink(tmpfile); - dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR); - redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno)); - return NULL; -} - -int dsRdbSaveBackground(char *filename) { - pthread_t thread; - - if (pthread_create(&thread,NULL,dsRdbSave_thread,zstrdup(filename)) != 0) { - redisLog(REDIS_WARNING,"Can't create diskstore BGSAVE thread: %s", - strerror(errno)); - return REDIS_ERR; - } else { - server.bgsavethread = thread; - return REDIS_OK; - } -} - -int dsRdbSave(char *filename) { - /* A blocking save is actually a non blocking save... just we wait - * for it to terminate in a non-busy loop. */ - - redisLog(REDIS_NOTICE,"Starting a blocking SAVE (BGSAVE + blocking wait)"); - server.dirty_before_bgsave = server.dirty; - if (dsRdbSaveBackground(filename) == REDIS_ERR) return REDIS_ERR; - while(1) { - usleep(1000); - int state; - - pthread_mutex_lock(&server.bgsavethread_mutex); - state = server.bgsavethread_state; - pthread_mutex_unlock(&server.bgsavethread_mutex); - - if (state == REDIS_BGSAVE_THREAD_DONE_OK || - state == REDIS_BGSAVE_THREAD_DONE_ERR) break; - } - return REDIS_OK; -} diff --git a/src/dscache.c b/src/dscache.c deleted file mode 100644 index 4b3204c5..00000000 --- a/src/dscache.c +++ /dev/null @@ -1,1028 +0,0 @@ -#include "redis.h" - -#include -#include -#include -#include - -/* dscache.c - Disk store cache for disk store backend. - * - * When Redis is configured for using disk as backend instead of memory, the - * memory is used as a cache, so that recently accessed keys are taken in - * memory for fast read and write operations. - * - * Modified keys are marked to be flushed on disk, and will be flushed - * as long as the maxium configured flush time elapsed. - * - * This file implements the whole caching subsystem and contains further - * documentation. */ - -/* TODO: - * - * WARNING: most of the following todo items and design issues are no - * longer relevant with the new design. Here as a checklist to see if - * some old ideas still apply. - * - * - What happens when an object is destroyed? - * - * If the object is destroyed since semantically it was deleted or - * replaced with something new, we don't care if there was a SAVE - * job pending for it. Anyway when the IO JOb will be created we'll get - * the pointer of the current value. - * - * If the object is already a REDIS_IO_SAVEINPROG object, then it is - * impossible that we get a decrRefCount() that will reach refcount of zero - * since the object is both in the dataset and in the io job entry. - * - * - What happens with MULTI/EXEC? - * - * Good question. Without some kind of versioning with a global counter - * it is not possible to have trasactions on disk, but they are still - * useful since from the point of view of memory and client bugs it is - * a protection anyway. Also it's useful for WATCH. - * - * Btw there is to check what happens when WATCH gets combined to keys - * that gets removed from the object cache. Should be save but better - * to check. - * - * - Check if/why INCR will not update the LRU info for the object. - * - * - Fix/Check the following race condition: a key gets a DEL so there is - * a write operation scheduled against this key. Later the same key will - * be the argument of a GET, but the write operation was still not - * completed (to delete the file). If the GET will be for some reason - * a blocking loading (via lookup) we can load the old value on memory. - * - * This problems can be fixed with negative caching. We can use it - * to optimize the system, but also when a key is deleted we mark - * it as non existing on disk as well (in a way that this cache - * entry can't be evicted, setting time to 0), then we avoid looking at - * the disk at all if the key can't be there. When an IO Job complete - * a deletion, we set the time of the negative caching to a non zero - * value so it will be evicted later. - * - * Are there other patterns like this where we load stale data? - * - * Also, make sure that key preloading is ONLY done for keys that are - * not marked as cacheKeyDoesNotExist(), otherwise, again, we can load - * data from disk that should instead be deleted. - * - * - dsSet() should use rename(2) in order to avoid corruptions. - * - * - Don't add a LOAD if there is already a LOADINPROGRESS, or is this - * impossible since anyway the io_keys stuff will work as lock? - * - * - Serialize special encoded things in a raw form. - * - * - When putting IO read operations on top of the queue, do this only if - * the already-on-top operation is not a save or if it is a save that - * is scheduled for later execution. If there is a save that is ready to - * fire, let's insert the load operation just before the first save that - * is scheduled for later exection for instance. - * - * - Support MULTI/EXEC transactions via a journal file, that is played on - * startup to check if there is cleanup to do. This way we can implement - * transactions with our simple file based KV store. - */ - -/* Virtual Memory is composed mainly of two subsystems: - * - Blocking Virutal Memory - * - Threaded Virtual Memory I/O - * The two parts are not fully decoupled, but functions are split among two - * different sections of the source code (delimited by comments) in order to - * make more clear what functionality is about the blocking VM and what about - * the threaded (not blocking) VM. - * - * Redis VM design: - * - * Redis VM is a blocking VM (one that blocks reading swapped values from - * disk into memory when a value swapped out is needed in memory) that is made - * unblocking by trying to examine the command argument vector in order to - * load in background values that will likely be needed in order to exec - * the command. The command is executed only once all the relevant keys - * are loaded into memory. - * - * This basically is almost as simple of a blocking VM, but almost as parallel - * as a fully non-blocking VM. - */ - -void spawnIOThread(void); -int cacheScheduleIOPushJobs(int flags); -int processActiveIOJobs(int max); - -/* =================== Virtual Memory - Blocking Side ====================== */ - -void dsInit(void) { - int pipefds[2]; - size_t stacksize; - - zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ - - redisLog(REDIS_NOTICE,"Opening Disk Store: %s", server.ds_path); - /* Open Disk Store */ - if (dsOpen() != REDIS_OK) { - redisLog(REDIS_WARNING,"Fatal error opening disk store. Exiting."); - exit(1); - }; - - /* Initialize threaded I/O for Object Cache */ - server.io_newjobs = listCreate(); - server.io_processing = listCreate(); - server.io_processed = listCreate(); - server.io_ready_clients = listCreate(); - pthread_mutex_init(&server.io_mutex,NULL); - pthread_cond_init(&server.io_condvar,NULL); - pthread_mutex_init(&server.bgsavethread_mutex,NULL); - server.io_active_threads = 0; - if (pipe(pipefds) == -1) { - redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting." - ,strerror(errno)); - exit(1); - } - server.io_ready_pipe_read = pipefds[0]; - server.io_ready_pipe_write = pipefds[1]; - redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR); - /* LZF requires a lot of stack */ - pthread_attr_init(&server.io_threads_attr); - pthread_attr_getstacksize(&server.io_threads_attr, &stacksize); - - /* Solaris may report a stacksize of 0, let's set it to 1 otherwise - * multiplying it by 2 in the while loop later will not really help ;) */ - if (!stacksize) stacksize = 1; - - while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; - pthread_attr_setstacksize(&server.io_threads_attr, stacksize); - /* Listen for events in the threaded I/O pipe */ - if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE, - vmThreadedIOCompletedJob, NULL) == AE_ERR) - oom("creating file event"); - - /* Spawn our I/O thread */ - spawnIOThread(); -} - -/* Compute how good candidate the specified object is for eviction. - * An higher number means a better candidate. */ -double computeObjectSwappability(robj *o) { - /* actual age can be >= minage, but not < minage. As we use wrapping - * 21 bit clocks with minutes resolution for the LRU. */ - return (double) estimateObjectIdleTime(o); -} - -/* Try to free one entry from the diskstore object cache */ -int cacheFreeOneEntry(void) { - int j, i; - struct dictEntry *best = NULL; - double best_swappability = 0; - redisDb *best_db = NULL; - robj *val; - sds key; - - for (j = 0; j < server.dbnum; j++) { - redisDb *db = server.db+j; - /* Why maxtries is set to 100? - * Because this way (usually) we'll find 1 object even if just 1% - 2% - * are swappable objects */ - int maxtries = 100; - - for (i = 0; i < 5 && dictSize(db->dict); i++) { - dictEntry *de; - double swappability; - robj keyobj; - sds keystr; - - if (maxtries) maxtries--; - de = dictGetRandomKey(db->dict); - keystr = dictGetEntryKey(de); - val = dictGetEntryVal(de); - initStaticStringObject(keyobj,keystr); - - /* Don't remove objects that are currently target of a - * read or write operation. */ - if (cacheScheduleIOGetFlags(db,&keyobj) != 0) { - if (maxtries) i--; /* don't count this try */ - continue; - } - swappability = computeObjectSwappability(val); - if (!best || swappability > best_swappability) { - best = de; - best_swappability = swappability; - best_db = db; - } - } - } - if (best == NULL) { - /* Not able to free a single object? we should check if our - * IO queues have stuff in queue, and try to consume the queue - * otherwise we'll use an infinite amount of memory if changes to - * the dataset are faster than I/O */ - if (listLength(server.cache_io_queue) > 0) { - redisLog(REDIS_DEBUG,"--- Busy waiting IO to reclaim memory"); - cacheScheduleIOPushJobs(REDIS_IO_ASAP); - processActiveIOJobs(1); - return REDIS_OK; - } - /* Nothing to free at all... */ - return REDIS_ERR; - } - key = dictGetEntryKey(best); - val = dictGetEntryVal(best); - - redisLog(REDIS_DEBUG,"Key selected for cache eviction: %s swappability:%f", - key, best_swappability); - - /* Delete this key from memory */ - { - robj *kobj = createStringObject(key,sdslen(key)); - dbDelete(best_db,kobj); - decrRefCount(kobj); - } - return REDIS_OK; -} - -/* ==================== Disk store negative caching ======================== - * - * When disk store is enabled, we need negative caching, that is, to remember - * keys that are for sure *not* on the disk key-value store. - * - * This is usefuls because without negative caching cache misses will cost us - * a disk lookup, even if the same non existing key is accessed again and again. - * - * With negative caching we remember that the key is not on disk, so if it's - * not in memory and we have a negative cache entry, we don't try a disk - * access at all. - */ - -/* Returns true if the specified key may exists on disk, that is, we don't - * have an entry in our negative cache for this key */ -int cacheKeyMayExist(redisDb *db, robj *key) { - return dictFind(db->io_negcache,key) == NULL; -} - -/* Set the specified key as an entry that may possibily exist on disk, that is, - * remove the negative cache entry for this key if any. */ -void cacheSetKeyMayExist(redisDb *db, robj *key) { - dictDelete(db->io_negcache,key); -} - -/* Set the specified key as non existing on disk, that is, create a negative - * cache entry for this key. */ -void cacheSetKeyDoesNotExist(redisDb *db, robj *key) { - if (dictReplace(db->io_negcache,key,(void*)time(NULL))) { - incrRefCount(key); - } -} - -/* Remove one entry from negative cache using approximated LRU. */ -int negativeCacheEvictOneEntry(void) { - struct dictEntry *de; - robj *best = NULL; - redisDb *best_db = NULL; - time_t time, best_time = 0; - int j; - - for (j = 0; j < server.dbnum; j++) { - redisDb *db = server.db+j; - int i; - - if (dictSize(db->io_negcache) == 0) continue; - for (i = 0; i < 3; i++) { - de = dictGetRandomKey(db->io_negcache); - time = (time_t) dictGetEntryVal(de); - - if (best == NULL || time < best_time) { - best = dictGetEntryKey(de); - best_db = db; - best_time = time; - } - } - } - if (best) { - dictDelete(best_db->io_negcache,best); - return REDIS_OK; - } else { - return REDIS_ERR; - } -} - -/* ================== Disk store cache - Threaded I/O ====================== */ - -void freeIOJob(iojob *j) { - decrRefCount(j->key); - /* j->val can be NULL if the job is about deleting the key from disk. */ - if (j->val) decrRefCount(j->val); - zfree(j); -} - -/* Every time a thread finished a Job, it writes a byte into the write side - * of an unix pipe in order to "awake" the main thread, and this function - * is called. - * - * If privdata == NULL the function will try to put more jobs in the queue - * of IO jobs to process as more room is made. privdata is equal to NULL - * when the function is called from the event loop, so we want to push - * more IO jobs in the queue. Instead when the function is called by - * other functions that want to create a write-barrier to avoid race - * conditions we don't push new jobs in the queue. */ -void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, - int mask) -{ - char buf[1]; - int retval, processed = 0, toprocess = -1; - REDIS_NOTUSED(el); - REDIS_NOTUSED(mask); - - /* For every byte we read in the read side of the pipe, there is one - * I/O job completed to process. */ - while((retval = read(fd,buf,1)) == 1) { - iojob *j; - listNode *ln; - - redisLog(REDIS_DEBUG,"Processing I/O completed job"); - - /* Get the processed element (the oldest one) */ - lockThreadedIO(); - redisAssert(listLength(server.io_processed) != 0); - if (toprocess == -1) { - toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100; - if (toprocess <= 0) toprocess = 1; - } - ln = listFirst(server.io_processed); - j = ln->value; - listDelNode(server.io_processed,ln); - unlockThreadedIO(); - - /* Post process it in the main thread, as there are things we - * can do just here to avoid race conditions and/or invasive locks */ - redisLog(REDIS_DEBUG,"COMPLETED Job type %s, key: %s", - (j->type == REDIS_IOJOB_LOAD) ? "load" : "save", - (unsigned char*)j->key->ptr); - if (j->type == REDIS_IOJOB_LOAD) { - /* Create the key-value pair in the in-memory database */ - if (j->val != NULL) { - /* Note: it's possible that the key is already in memory - * due to a blocking load operation. */ - if (dictFind(j->db->dict,j->key->ptr) == NULL) { - dbAdd(j->db,j->key,j->val); - incrRefCount(j->val); - if (j->expire != -1) setExpire(j->db,j->key,j->expire); - } - } else { - /* Key not found on disk. If it is also not in memory - * as a cached object, nor there is a job writing it - * in background, we are sure the key does not exist - * currently. - * - * So we set a negative cache entry avoiding that the - * resumed client will block load what does not exist... */ - if (dictFind(j->db->dict,j->key->ptr) == NULL && - (cacheScheduleIOGetFlags(j->db,j->key) & - (REDIS_IO_SAVE|REDIS_IO_SAVEINPROG)) == 0) - { - cacheSetKeyDoesNotExist(j->db,j->key); - } - } - cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_LOADINPROG); - handleClientsBlockedOnSwappedKey(j->db,j->key); - } else if (j->type == REDIS_IOJOB_SAVE) { - cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_SAVEINPROG); - } - freeIOJob(j); - processed++; - if (privdata == NULL) cacheScheduleIOPushJobs(0); - if (processed == toprocess) return; - } - if (retval < 0 && errno != EAGAIN) { - redisLog(REDIS_WARNING, - "WARNING: read(2) error in vmThreadedIOCompletedJob() %s", - strerror(errno)); - } -} - -void lockThreadedIO(void) { - pthread_mutex_lock(&server.io_mutex); -} - -void unlockThreadedIO(void) { - pthread_mutex_unlock(&server.io_mutex); -} - -void *IOThreadEntryPoint(void *arg) { - iojob *j; - listNode *ln; - REDIS_NOTUSED(arg); - long long start; - - pthread_detach(pthread_self()); - lockThreadedIO(); - while(1) { - /* Get a new job to process */ - if (listLength(server.io_newjobs) == 0) { - /* Wait for more work to do */ - redisLog(REDIS_DEBUG,"[T] wait for signal"); - pthread_cond_wait(&server.io_condvar,&server.io_mutex); - redisLog(REDIS_DEBUG,"[T] signal received"); - continue; - } - start = ustime(); - redisLog(REDIS_DEBUG,"[T] %ld IO jobs to process", - listLength(server.io_newjobs)); - ln = listFirst(server.io_newjobs); - j = ln->value; - listDelNode(server.io_newjobs,ln); - /* Add the job in the processing queue */ - listAddNodeTail(server.io_processing,j); - ln = listLast(server.io_processing); /* We use ln later to remove it */ - unlockThreadedIO(); - - redisLog(REDIS_DEBUG,"[T] %ld: new job type %s: %p about key '%s'", - (long) pthread_self(), - (j->type == REDIS_IOJOB_LOAD) ? "load" : "save", - (void*)j, (char*)j->key->ptr); - - /* Process the Job */ - if (j->type == REDIS_IOJOB_LOAD) { - time_t expire; - - j->val = dsGet(j->db,j->key,&expire); - if (j->val) j->expire = expire; - } else if (j->type == REDIS_IOJOB_SAVE) { - if (j->val) { - dsSet(j->db,j->key,j->val,j->expire); - } else { - dsDel(j->db,j->key); - } - } - - /* Done: insert the job into the processed queue */ - redisLog(REDIS_DEBUG,"[T] %ld completed the job: %p (key %s)", - (long) pthread_self(), (void*)j, (char*)j->key->ptr); - - redisLog(REDIS_DEBUG,"[T] lock IO"); - lockThreadedIO(); - redisLog(REDIS_DEBUG,"[T] IO locked"); - listDelNode(server.io_processing,ln); - listAddNodeTail(server.io_processed,j); - - /* Signal the main thread there is new stuff to process */ - redisAssert(write(server.io_ready_pipe_write,"x",1) == 1); - redisLog(REDIS_DEBUG,"TIME (%c): %lld\n", j->type == REDIS_IOJOB_LOAD ? 'L' : 'S', ustime()-start); - } - /* never reached, but that's the full pattern... */ - unlockThreadedIO(); - return NULL; -} - -void spawnIOThread(void) { - pthread_t thread; - sigset_t mask, omask; - int err; - - sigemptyset(&mask); - sigaddset(&mask,SIGCHLD); - sigaddset(&mask,SIGHUP); - sigaddset(&mask,SIGPIPE); - pthread_sigmask(SIG_SETMASK, &mask, &omask); - while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) { - redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s", - strerror(err)); - usleep(1000000); - } - pthread_sigmask(SIG_SETMASK, &omask, NULL); - server.io_active_threads++; -} - -/* Wait that up to 'max' pending IO Jobs are processed by the I/O thread. - * From our point of view an IO job processed means that the count of - * server.io_processed must increase by one. - * - * If max is -1, all the pending IO jobs will be processed. - * - * Returns the number of IO jobs processed. - * - * NOTE: while this may appear like a busy loop, we are actually blocked - * by IO since we continuously acquire/release the IO lock. */ -int processActiveIOJobs(int max) { - int processed = 0; - - while(max == -1 || max > 0) { - int io_processed_len; - - redisLog(REDIS_DEBUG,"[P] lock IO"); - lockThreadedIO(); - redisLog(REDIS_DEBUG,"Waiting IO jobs processing: new:%d proessing:%d processed:%d",listLength(server.io_newjobs),listLength(server.io_processing),listLength(server.io_processed)); - - if (listLength(server.io_newjobs) == 0 && - listLength(server.io_processing) == 0) - { - /* There is nothing more to process */ - redisLog(REDIS_DEBUG,"[P] Nothing to process, unlock IO, return"); - unlockThreadedIO(); - break; - } - -#if 1 - /* If there are new jobs we need to signal the thread to - * process the next one. FIXME: drop this if useless. */ - redisLog(REDIS_DEBUG,"[P] waitEmptyIOJobsQueue: new %d, processing %d, processed %d", - listLength(server.io_newjobs), - listLength(server.io_processing), - listLength(server.io_processed)); - - if (listLength(server.io_newjobs)) { - redisLog(REDIS_DEBUG,"[P] There are new jobs, signal"); - pthread_cond_signal(&server.io_condvar); - } -#endif - - /* Check if we can process some finished job */ - io_processed_len = listLength(server.io_processed); - redisLog(REDIS_DEBUG,"[P] Unblock IO"); - unlockThreadedIO(); - redisLog(REDIS_DEBUG,"[P] Wait"); - usleep(10000); - if (io_processed_len) { - vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read, - (void*)0xdeadbeef,0); - processed++; - if (max != -1) max--; - } - } - return processed; -} - -void waitEmptyIOJobsQueue(void) { - processActiveIOJobs(-1); -} - -/* Process up to 'max' IO Jobs already completed by threads but still waiting - * processing from the main thread. - * - * If max == -1 all the pending jobs are processed. - * - * The number of processed jobs is returned. */ -int processPendingIOJobs(int max) { - int processed = 0; - - while(max == -1 || max > 0) { - int io_processed_len; - - lockThreadedIO(); - io_processed_len = listLength(server.io_processed); - unlockThreadedIO(); - if (io_processed_len == 0) break; - vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read, - (void*)0xdeadbeef,0); - if (max != -1) max--; - processed++; - } - return processed; -} - -void processAllPendingIOJobs(void) { - processPendingIOJobs(-1); -} - -/* This function must be called while with threaded IO locked */ -void queueIOJob(iojob *j) { - redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n", - (void*)j, j->type, (char*)j->key->ptr); - listAddNodeTail(server.io_newjobs,j); -} - -/* Consume all the IO scheduled operations, and all the thread IO jobs - * so that eventually the state of diskstore is a point-in-time snapshot. - * - * This is useful when we need to BGSAVE with diskstore enabled. */ -void cacheForcePointInTime(void) { - redisLog(REDIS_NOTICE,"Diskstore: synching on disk to reach point-in-time state."); - while (listLength(server.cache_io_queue) != 0) { - cacheScheduleIOPushJobs(REDIS_IO_ASAP); - processActiveIOJobs(1); - } - waitEmptyIOJobsQueue(); - processAllPendingIOJobs(); -} - -void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val, time_t expire) { - iojob *j; - - j = zmalloc(sizeof(*j)); - j->type = type; - j->db = db; - j->key = key; - incrRefCount(key); - j->val = val; - if (val) incrRefCount(val); - j->expire = expire; - - lockThreadedIO(); - queueIOJob(j); - pthread_cond_signal(&server.io_condvar); - unlockThreadedIO(); -} - -/* ============= Disk store cache - Scheduling of IO operations ============= - * - * We use a queue and an hash table to hold the state of IO operations - * so that's fast to lookup if there is already an IO operation in queue - * for a given key. - * - * There are two types of IO operations for a given key: - * REDIS_IO_LOAD and REDIS_IO_SAVE. - * - * The function cacheScheduleIO() function pushes the specified IO operation - * in the queue, but avoid adding the same key for the same operation - * multiple times, thanks to the associated hash table. - * - * We take a set of flags per every key, so when the scheduled IO operation - * gets moved from the scheduled queue to the actual IO Jobs queue that - * is processed by the IO thread, we flag it as IO_LOADINPROG or - * IO_SAVEINPROG. - * - * So for every given key we always know if there is some IO operation - * scheduled, or in progress, for this key. - * - * NOTE: all this is very important in order to guarantee correctness of - * the Disk Store Cache. Jobs are always queued here. Load jobs are - * queued at the head for faster execution only in the case there is not - * already a write operation of some kind for this job. - * - * So we have ordering, but can do exceptions when there are no already - * operations for a given key. Also when we need to block load a given - * key, for an immediate lookup operation, we can check if the key can - * be accessed synchronously without race conditions (no IN PROGRESS - * operations for this key), otherwise we blocking wait for completion. */ - -#define REDIS_IO_LOAD 1 -#define REDIS_IO_SAVE 2 -#define REDIS_IO_LOADINPROG 4 -#define REDIS_IO_SAVEINPROG 8 - -void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag) { - struct dictEntry *de = dictFind(db->io_queued,key); - - if (!de) { - dictAdd(db->io_queued,key,(void*)flag); - incrRefCount(key); - return; - } else { - long flags = (long) dictGetEntryVal(de); - - if (flags & flag) { - redisLog(REDIS_WARNING,"Adding the same flag again: was: %ld, addede: %ld",flags,flag); - redisAssert(!(flags & flag)); - } - flags |= flag; - dictGetEntryVal(de) = (void*) flags; - } -} - -void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag) { - struct dictEntry *de = dictFind(db->io_queued,key); - long flags; - - redisAssert(de != NULL); - flags = (long) dictGetEntryVal(de); - redisAssert(flags & flag); - flags &= ~flag; - if (flags == 0) { - dictDelete(db->io_queued,key); - } else { - dictGetEntryVal(de) = (void*) flags; - } -} - -int cacheScheduleIOGetFlags(redisDb *db, robj *key) { - struct dictEntry *de = dictFind(db->io_queued,key); - - return (de == NULL) ? 0 : ((long) dictGetEntryVal(de)); -} - -void cacheScheduleIO(redisDb *db, robj *key, int type) { - ioop *op; - long flags; - - if ((flags = cacheScheduleIOGetFlags(db,key)) & type) return; - - redisLog(REDIS_DEBUG,"Scheduling key %s for %s", - key->ptr, type == REDIS_IO_LOAD ? "loading" : "saving"); - cacheScheduleIOAddFlag(db,key,type); - op = zmalloc(sizeof(*op)); - op->type = type; - op->db = db; - op->key = key; - incrRefCount(key); - op->ctime = time(NULL); - - /* Give priority to load operations if there are no save already - * in queue for the same key. */ - if (type == REDIS_IO_LOAD && !(flags & REDIS_IO_SAVE)) { - listAddNodeHead(server.cache_io_queue, op); - cacheScheduleIOPushJobs(REDIS_IO_ONLYLOADS); - } else { - /* FIXME: probably when this happens we want to at least move - * the write job about this queue on top, and set the creation time - * to a value that will force processing ASAP. */ - listAddNodeTail(server.cache_io_queue, op); - } -} - -/* Push scheduled IO operations into IO Jobs that the IO thread can process. - * - * If flags include REDIS_IO_ONLYLOADS only load jobs are processed:this is - * useful since it's safe to push LOAD IO jobs from any place of the code, while - * SAVE io jobs should never be pushed while we are processing a command - * (not protected by lookupKey() that will block on keys in IO_SAVEINPROG - * state. - * - * The REDIS_IO_ASAP flag tells the function to don't wait for the IO job - * scheduled completion time, but just do the operation ASAP. This is useful - * when we need to reclaim memory from the IO queue. - */ -#define MAX_IO_JOBS_QUEUE 10 -int cacheScheduleIOPushJobs(int flags) { - time_t now = time(NULL); - listNode *ln; - int jobs, topush = 0, pushed = 0; - - /* Don't push new jobs if there is a threaded BGSAVE in progress. */ - if (server.bgsavethread != (pthread_t) -1) return 0; - - /* Sync stuff on disk, but only if we have less - * than MAX_IO_JOBS_QUEUE IO jobs. */ - lockThreadedIO(); - jobs = listLength(server.io_newjobs); - unlockThreadedIO(); - - topush = MAX_IO_JOBS_QUEUE-jobs; - if (topush < 0) topush = 0; - if (topush > (signed)listLength(server.cache_io_queue)) - topush = listLength(server.cache_io_queue); - - while((ln = listFirst(server.cache_io_queue)) != NULL) { - ioop *op = ln->value; - struct dictEntry *de; - robj *val; - - if (!topush) break; - topush--; - - if (op->type != REDIS_IO_LOAD && flags & REDIS_IO_ONLYLOADS) break; - - /* Don't execute SAVE before the scheduled time for completion */ - if (op->type == REDIS_IO_SAVE && !(flags & REDIS_IO_ASAP) && - (now - op->ctime) < server.cache_flush_delay) break; - - /* Don't add a SAVE job in the IO thread queue if there is already - * a save in progress for the same key. */ - if (op->type == REDIS_IO_SAVE && - cacheScheduleIOGetFlags(op->db,op->key) & REDIS_IO_SAVEINPROG) - { - /* Move the operation at the end of the list if there - * are other operations, so we can try to process the next one. - * Otherwise break, nothing to do here. */ - if (listLength(server.cache_io_queue) > 1) { - listDelNode(server.cache_io_queue,ln); - listAddNodeTail(server.cache_io_queue,op); - continue; - } else { - break; - } - } - - redisLog(REDIS_DEBUG,"Creating IO %s Job for key %s", - op->type == REDIS_IO_LOAD ? "load" : "save", op->key->ptr); - - if (op->type == REDIS_IO_LOAD) { - cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL,0); - } else { - time_t expire = -1; - - /* Lookup the key, in order to put the current value in the IO - * Job. Otherwise if the key does not exists we schedule a disk - * store delete operation, setting the value to NULL. */ - de = dictFind(op->db->dict,op->key->ptr); - if (de) { - val = dictGetEntryVal(de); - expire = getExpire(op->db,op->key); - } else { - /* Setting the value to NULL tells the IO thread to delete - * the key on disk. */ - val = NULL; - } - cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val,expire); - } - /* Mark the operation as in progress. */ - cacheScheduleIODelFlag(op->db,op->key,op->type); - cacheScheduleIOAddFlag(op->db,op->key, - (op->type == REDIS_IO_LOAD) ? REDIS_IO_LOADINPROG : - REDIS_IO_SAVEINPROG); - /* Finally remove the operation from the queue. - * But we'll have trace of it in the hash table. */ - listDelNode(server.cache_io_queue,ln); - decrRefCount(op->key); - zfree(op); - pushed++; - } - return pushed; -} - -void cacheCron(void) { - /* Push jobs */ - cacheScheduleIOPushJobs(0); - - /* Reclaim memory from the object cache */ - while (server.ds_enabled && zmalloc_used_memory() > - server.cache_max_memory) - { - int done = 0; - - if (cacheFreeOneEntry() == REDIS_OK) done++; - if (negativeCacheEvictOneEntry() == REDIS_OK) done++; - if (done == 0) break; /* nothing more to free */ - } -} - -/* ========== Disk store cache - Blocking clients on missing keys =========== */ - -/* This function makes the clinet 'c' waiting for the key 'key' to be loaded. - * If the key is already in memory we don't need to block. - * - * FIXME: we should try if it's actually better to suspend the client - * accessing an object that is being saved, and awake it only when - * the saving was completed. - * - * Otherwise if the key is not in memory, we block the client and start - * an IO Job to load it: - * - * the key is added to the io_keys list in the client structure, and also - * in the hash table mapping swapped keys to waiting clients, that is, - * server.io_waited_keys. */ -int waitForSwappedKey(redisClient *c, robj *key) { - struct dictEntry *de; - list *l; - - /* Return ASAP if the key is in memory */ - de = dictFind(c->db->dict,key->ptr); - if (de != NULL) return 0; - - /* Don't wait for keys we are sure are not on disk either */ - if (!cacheKeyMayExist(c->db,key)) return 0; - - /* Add the key to the list of keys this client is waiting for. - * This maps clients to keys they are waiting for. */ - listAddNodeTail(c->io_keys,key); - incrRefCount(key); - - /* Add the client to the swapped keys => clients waiting map. */ - de = dictFind(c->db->io_keys,key); - if (de == NULL) { - int retval; - - /* For every key we take a list of clients blocked for it */ - l = listCreate(); - retval = dictAdd(c->db->io_keys,key,l); - incrRefCount(key); - redisAssert(retval == DICT_OK); - } else { - l = dictGetEntryVal(de); - } - listAddNodeTail(l,c); - - /* Are we already loading the key from disk? If not create a job */ - if (de == NULL) { - int flags = cacheScheduleIOGetFlags(c->db,key); - - /* It is possible that even if there are no clients waiting for - * a load operation, still we have a load operation in progress. - * For instance think to a client performing a GET and then - * closing the connection */ - if ((flags & (REDIS_IO_LOAD|REDIS_IO_LOADINPROG)) == 0) - cacheScheduleIO(c->db,key,REDIS_IO_LOAD); - } - return 1; -} - -/* Is this client attempting to run a command against swapped keys? - * If so, block it ASAP, load the keys in background, then resume it. - * - * The important idea about this function is that it can fail! If keys will - * still be swapped when the client is resumed, this key lookups will - * just block loading keys from disk. In practical terms this should only - * happen with SORT BY command or if there is a bug in this function. - * - * Return 1 if the client is marked as blocked, 0 if the client can - * continue as the keys it is going to access appear to be in memory. */ -int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) { - int *keyindex, numkeys, j, i; - - /* EXEC is a special case, we need to preload all the commands - * queued into the transaction */ - if (cmd->proc == execCommand) { - struct redisCommand *mcmd; - robj **margv; - int margc; - - if (!(c->flags & REDIS_MULTI)) return 0; - for (i = 0; i < c->mstate.count; i++) { - mcmd = c->mstate.commands[i].cmd; - margc = c->mstate.commands[i].argc; - margv = c->mstate.commands[i].argv; - - keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys, - REDIS_GETKEYS_PRELOAD); - for (j = 0; j < numkeys; j++) { - redisLog(REDIS_DEBUG,"Preloading %s", - (char*)margv[keyindex[j]]->ptr); - waitForSwappedKey(c,margv[keyindex[j]]); - } - getKeysFreeResult(keyindex); - } - } else { - keyindex = getKeysFromCommand(cmd,c->argv,c->argc,&numkeys, - REDIS_GETKEYS_PRELOAD); - for (j = 0; j < numkeys; j++) { - redisLog(REDIS_DEBUG,"Preloading %s", - (char*)c->argv[keyindex[j]]->ptr); - waitForSwappedKey(c,c->argv[keyindex[j]]); - } - getKeysFreeResult(keyindex); - } - - /* If the client was blocked for at least one key, mark it as blocked. */ - if (listLength(c->io_keys)) { - c->flags |= REDIS_IO_WAIT; - aeDeleteFileEvent(server.el,c->fd,AE_READABLE); - server.cache_blocked_clients++; - return 1; - } else { - return 0; - } -} - -/* Remove the 'key' from the list of blocked keys for a given client. - * - * The function returns 1 when there are no longer blocking keys after - * the current one was removed (and the client can be unblocked). */ -int dontWaitForSwappedKey(redisClient *c, robj *key) { - list *l; - listNode *ln; - listIter li; - struct dictEntry *de; - - /* The key object might be destroyed when deleted from the c->io_keys - * list (and the "key" argument is physically the same object as the - * object inside the list), so we need to protect it. */ - incrRefCount(key); - - /* Remove the key from the list of keys this client is waiting for. */ - listRewind(c->io_keys,&li); - while ((ln = listNext(&li)) != NULL) { - if (equalStringObjects(ln->value,key)) { - listDelNode(c->io_keys,ln); - break; - } - } - redisAssert(ln != NULL); - - /* Remove the client form the key => waiting clients map. */ - de = dictFind(c->db->io_keys,key); - redisAssert(de != NULL); - l = dictGetEntryVal(de); - ln = listSearchKey(l,c); - redisAssert(ln != NULL); - listDelNode(l,ln); - if (listLength(l) == 0) - dictDelete(c->db->io_keys,key); - - decrRefCount(key); - return listLength(c->io_keys) == 0; -} - -/* Every time we now a key was loaded back in memory, we handle clients - * waiting for this key if any. */ -void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) { - struct dictEntry *de; - list *l; - listNode *ln; - int len; - - de = dictFind(db->io_keys,key); - if (!de) return; - - l = dictGetEntryVal(de); - len = listLength(l); - /* Note: we can't use something like while(listLength(l)) as the list - * can be freed by the calling function when we remove the last element. */ - while (len--) { - ln = listFirst(l); - redisClient *c = ln->value; - - if (dontWaitForSwappedKey(c,key)) { - /* Put the client in the list of clients ready to go as we - * loaded all the keys about it. */ - listAddNodeTail(server.io_ready_clients,c); - } - } -} diff --git a/src/networking.c b/src/networking.c index 8f2e6d8f..dbd83505 100644 --- a/src/networking.c +++ b/src/networking.c @@ -487,25 +487,6 @@ void freeClient(redisClient *c) { redisAssert(ln != NULL); listDelNode(server.unblocked_clients,ln); } - /* Remove from the list of clients waiting for swapped keys, or ready - * to be restarted, but not yet woken up again. */ - if (c->flags & REDIS_IO_WAIT) { - redisAssert(server.ds_enabled); - if (listLength(c->io_keys) == 0) { - ln = listSearchKey(server.io_ready_clients,c); - - /* When this client is waiting to be woken up (REDIS_IO_WAIT), - * it should be present in the list io_ready_clients */ - redisAssert(ln != NULL); - listDelNode(server.io_ready_clients,ln); - } else { - while (listLength(c->io_keys)) { - ln = listFirst(c->io_keys); - dontWaitForSwappedKey(c,ln->value); - } - } - server.cache_blocked_clients--; - } listRelease(c->io_keys); /* Master/slave cleanup. * Case 1: we lost the connection with a slave. */ @@ -796,9 +777,6 @@ int processMultibulkBuffer(redisClient *c) { void processInputBuffer(redisClient *c) { /* Keep processing while there is something in the input buffer */ while(sdslen(c->querybuf)) { - /* Immediately abort if the client is in the middle of something. */ - if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; - /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is * written to the client. Make sure to not let the reply grow after * this flag has been set (i.e. don't process more commands). */ @@ -907,7 +885,6 @@ void clientCommand(redisClient *c) { if (p == flags) *p++ = 'N'; if (client->flags & REDIS_MULTI) *p++ = 'x'; if (client->flags & REDIS_BLOCKED) *p++ = 'b'; - if (client->flags & REDIS_IO_WAIT) *p++ = 'i'; if (client->flags & REDIS_DIRTY_CAS) *p++ = 'd'; if (client->flags & REDIS_CLOSE_AFTER_REPLY) *p++ = 'c'; if (client->flags & REDIS_UNBLOCKED) *p++ = 'u'; diff --git a/src/object.c b/src/object.c index 20e7f57a..ce13429a 100644 --- a/src/object.c +++ b/src/object.c @@ -1,5 +1,4 @@ #include "redis.h" -#include #include robj *createObject(int type, void *ptr) { @@ -30,9 +29,7 @@ robj *createStringObject(char *ptr, size_t len) { robj *createStringObjectFromLongLong(long long value) { robj *o; - if (value >= 0 && value < REDIS_SHARED_INTEGERS && - !server.ds_enabled && - pthread_equal(pthread_self(),server.mainthread)) { + if (value >= 0 && value < REDIS_SHARED_INTEGERS) { incrRefCount(shared.integers[value]); o = shared.integers[value]; } else { @@ -241,10 +238,7 @@ robj *tryObjectEncoding(robj *o) { * Note that we also avoid using shared integers when maxmemory is used * because every object needs to have a private LRU field for the LRU * algorithm to work well. */ - if (!server.ds_enabled && - server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS && - pthread_equal(pthread_self(),server.mainthread)) - { + if (server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS) { decrRefCount(o); incrRefCount(shared.integers[value]); return shared.integers[value]; diff --git a/src/rdb.c b/src/rdb.c index d019d94f..cfbec3e8 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -413,11 +413,6 @@ int rdbSave(char *filename) { int j; time_t now = time(NULL); - if (server.ds_enabled) { - cacheForcePointInTime(); - return dsRdbSave(filename); - } - snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { @@ -484,16 +479,10 @@ int rdbSaveBackground(char *filename) { pid_t childpid; long long start; - if (server.bgsavechildpid != -1 || - server.bgsavethread != (pthread_t) -1) return REDIS_ERR; + if (server.bgsavechildpid != -1) return REDIS_ERR; server.dirty_before_bgsave = server.dirty; - if (server.ds_enabled) { - cacheForcePointInTime(); - return dsRdbSaveBackground(filename); - } - start = ustime(); if ((childpid = fork()) == 0) { int retval; @@ -1013,15 +1002,13 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) { rdbRemoveTempFile(server.bgsavechildpid); } server.bgsavechildpid = -1; - server.bgsavethread = (pthread_t) -1; - server.bgsavethread_state = REDIS_BGSAVE_THREAD_UNACTIVE; /* Possibly there are slaves waiting for a BGSAVE in order to be served * (the first stage of SYNC is a bulk transfer of dump.rdb) */ updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR); } void saveCommand(redisClient *c) { - if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread_t)-1) { + if (server.bgsavechildpid != -1) { addReplyError(c,"Background save already in progress"); return; } @@ -1033,7 +1020,7 @@ void saveCommand(redisClient *c) { } void bgsaveCommand(redisClient *c) { - if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread_t)-1) { + if (server.bgsavechildpid != -1) { addReplyError(c,"Background save already in progress"); } else if (server.bgrewritechildpid != -1) { addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress"); diff --git a/src/redis.c b/src/redis.c index 1a1f84f9..89efffba 100644 --- a/src/redis.c +++ b/src/redis.c @@ -50,7 +50,6 @@ #include #include #include -#include #include /* Our shared "common" objects */ @@ -659,22 +658,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } updateDictResizePolicy(); } - } else if (server.bgsavethread != (pthread_t) -1) { - if (server.bgsavethread != (pthread_t) -1) { - int state; - - pthread_mutex_lock(&server.bgsavethread_mutex); - state = server.bgsavethread_state; - pthread_mutex_unlock(&server.bgsavethread_mutex); - - if (state == REDIS_BGSAVE_THREAD_DONE_OK || - state == REDIS_BGSAVE_THREAD_DONE_ERR) - { - backgroundSaveDoneHandler( - (state == REDIS_BGSAVE_THREAD_DONE_OK) ? 0 : 1, 0); - } - } - } else if (!server.ds_enabled) { + } else { time_t now = time(NULL); /* If there is not a background saving/rewrite in progress check if @@ -712,10 +696,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * in order to guarantee a strict consistency. */ if (server.masterhost == NULL) activeExpireCycle(); - /* Remove a few cached objects from memory if we are over the - * configured memory limit */ - if (server.ds_enabled) cacheCron(); - /* Replication cron function -- used to reconnect to master and * to detect transfer failures. */ if (!(loops % 10)) replicationCron(); @@ -735,31 +715,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) { listNode *ln; redisClient *c; - /* Awake clients that got all the on disk keys they requested */ - if (server.ds_enabled && listLength(server.io_ready_clients)) { - listIter li; - - listRewind(server.io_ready_clients,&li); - while((ln = listNext(&li))) { - c = ln->value; - struct redisCommand *cmd; - - /* Resume the client. */ - listDelNode(server.io_ready_clients,ln); - c->flags &= (~REDIS_IO_WAIT); - server.cache_blocked_clients--; - aeCreateFileEvent(server.el, c->fd, AE_READABLE, - readQueryFromClient, c); - cmd = lookupCommand(c->argv[0]->ptr); - redisAssert(cmd != NULL); - call(c,cmd); - resetClient(c); - /* There may be more data to process in the input buffer. */ - if (c->querybuf && sdslen(c->querybuf) > 0) - processInputBuffer(c); - } - } - /* Try to process pending commands for clients that were just unblocked. */ while (listLength(server.unblocked_clients)) { ln = listFirst(server.unblocked_clients); @@ -870,10 +825,6 @@ void initServerConfig() { server.maxmemory = 0; server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU; server.maxmemory_samples = 3; - server.ds_enabled = 0; - server.ds_path = sdsnew("/tmp/redis.ds"); - server.cache_max_memory = 64LL*1024*1024; /* 64 MB of RAM */ - server.cache_blocked_clients = 0; server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES; server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE; server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES; @@ -882,7 +833,6 @@ void initServerConfig() { server.zset_max_ziplist_entries = REDIS_ZSET_MAX_ZIPLIST_ENTRIES; server.zset_max_ziplist_value = REDIS_ZSET_MAX_ZIPLIST_VALUE; server.shutdown_asap = 0; - server.cache_flush_delay = 0; server.cluster_enabled = 0; server.cluster.configfile = zstrdup("nodes.conf"); @@ -930,12 +880,10 @@ void initServer() { server.syslog_facility); } - server.mainthread = pthread_self(); server.clients = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); server.unblocked_clients = listCreate(); - server.cache_io_queue = listCreate(); createSharedObjects(); server.el = aeCreateEventLoop(); @@ -965,11 +913,6 @@ void initServer() { server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); - if (server.ds_enabled) { - server.db[j].io_keys = dictCreate(&keylistDictType,NULL); - server.db[j].io_negcache = dictCreate(&setDictType,NULL); - server.db[j].io_queued = dictCreate(&setDictType,NULL); - } server.db[j].id = j; } server.pubsub_channels = dictCreate(&keylistDictType,NULL); @@ -979,8 +922,6 @@ void initServer() { server.cronloops = 0; server.bgsavechildpid = -1; server.bgrewritechildpid = -1; - server.bgsavethread_state = REDIS_BGSAVE_THREAD_UNACTIVE; - server.bgsavethread = (pthread_t) -1; server.bgrewritebuf = sdsempty(); server.aofbuf = sdsempty(); server.lastsave = time(NULL); @@ -1010,7 +951,6 @@ void initServer() { } } - if (server.ds_enabled) dsInit(); if (server.cluster_enabled) clusterInit(); srand(time(NULL)^getpid()); } @@ -1188,8 +1128,6 @@ int processCommand(redisClient *c) { queueMultiCommand(c,cmd); addReply(c,shared.queued); } else { - if (server.ds_enabled && blockClientOnSwappedKeys(c,cmd)) - return REDIS_ERR; call(c,cmd); } return REDIS_OK; @@ -1207,9 +1145,7 @@ int prepareForShutdown() { kill(server.bgsavechildpid,SIGKILL); rdbRemoveTempFile(server.bgsavechildpid); } - if (server.ds_enabled) { - /* FIXME: flush all objects on disk */ - } else if (server.appendonly) { + if (server.appendonly) { /* Append only file: fsync() the AOF and exit */ aof_fsync(server.appendfd); } else if (server.saveparamslen > 0) { @@ -1391,8 +1327,7 @@ sds genRedisInfoString(char *section) { server.loading, server.appendonly, server.dirty, - server.bgsavechildpid != -1 || - server.bgsavethread != (pthread_t) -1, + server.bgsavechildpid != -1, server.lastsave, server.bgrewritechildpid != -1); @@ -1438,35 +1373,6 @@ sds genRedisInfoString(char *section) { } } - /* Diskstore */ - if (allsections || defsections || !strcasecmp(section,"diskstore")) { - if (sections++) info = sdscat(info,"\r\n"); - info = sdscatprintf(info, - "# Diskstore\r\n" - "ds_enabled:%d\r\n", - server.ds_enabled != 0); - if (server.ds_enabled) { - lockThreadedIO(); - info = sdscatprintf(info, - "cache_max_memory:%llu\r\n" - "cache_blocked_clients:%lu\r\n" - "cache_io_queue_len:%lu\r\n" - "cache_io_jobs_new:%lu\r\n" - "cache_io_jobs_processing:%lu\r\n" - "cache_io_jobs_processed:%lu\r\n" - "cache_io_ready_clients:%lu\r\n" - ,(unsigned long long) server.cache_max_memory, - (unsigned long) server.cache_blocked_clients, - (unsigned long) listLength(server.cache_io_queue), - (unsigned long) listLength(server.io_newjobs), - (unsigned long) listLength(server.io_processing), - (unsigned long) listLength(server.io_processed), - (unsigned long) listLength(server.io_ready_clients) - ); - unlockThreadedIO(); - } - } - /* Stats */ if (allsections || defsections || !strcasecmp(section,"stats")) { if (sections++) info = sdscat(info,"\r\n"); @@ -1824,9 +1730,7 @@ int main(int argc, char **argv) { linuxOvercommitMemoryWarning(); #endif start = ustime(); - if (server.ds_enabled) { - redisLog(REDIS_NOTICE,"DB not loaded (running with disk back end)"); - } else if (server.appendonly) { + if (server.appendonly) { if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK) redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { diff --git a/src/redis.h b/src/redis.h index cab7607e..a5cd15ed 100644 --- a/src/redis.h +++ b/src/redis.h @@ -124,26 +124,12 @@ #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */ #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */ -/* Scheduled IO opeations flags. */ -#define REDIS_IO_LOAD 1 -#define REDIS_IO_SAVE 2 -#define REDIS_IO_LOADINPROG 4 -#define REDIS_IO_SAVEINPROG 8 - -/* Generic IO flags */ -#define REDIS_IO_ONLYLOADS 1 -#define REDIS_IO_ASAP 2 - -#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 -#define REDIS_THREAD_STACK_SIZE (1024*1024*4) - /* Client flags */ #define REDIS_SLAVE 1 /* This client is a slave server */ #define REDIS_MASTER 2 /* This client is a master server */ #define REDIS_MONITOR 4 /* This client is a slave monitor, see MONITOR */ #define REDIS_MULTI 8 /* This client is in a MULTI context */ #define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */ -#define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ #define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */ #define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */ #define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in @@ -222,12 +208,6 @@ #define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4 #define REDIS_MAXMEMORY_NO_EVICTION 5 -/* Diskstore background saving thread states */ -#define REDIS_BGSAVE_THREAD_UNACTIVE 0 -#define REDIS_BGSAVE_THREAD_ACTIVE 1 -#define REDIS_BGSAVE_THREAD_DONE_OK 2 -#define REDIS_BGSAVE_THREAD_DONE_ERR 3 - /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) #define redisPanic(_e) _redisPanic(#_e,__FILE__,__LINE__),_exit(1) @@ -292,9 +272,6 @@ typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ - dict *io_keys; /* Keys with clients waiting for DS I/O */ - dict *io_negcache; /* Negative caching for disk store */ - dict *io_queued; /* Queued IO operations hash table */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@ -516,7 +493,6 @@ typedef struct { struct redisServer { /* General */ - pthread_t mainthread; redisDb *db; dict *commands; /* Command table hahs table */ aeEventLoop *el; @@ -574,9 +550,6 @@ struct redisServer { char *pidfile; pid_t bgsavechildpid; pid_t bgrewritechildpid; - int bgsavethread_state; - pthread_mutex_t bgsavethread_mutex; - pthread_t bgsavethread; sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ sds aofbuf; /* AOF buffer, written before entering the event loop */ struct saveparam *saveparams; @@ -612,19 +585,12 @@ struct redisServer { int maxmemory_samples; /* Blocked clients */ unsigned int bpop_blocked_clients; - unsigned int cache_blocked_clients; list *unblocked_clients; /* list of clients to unblock before next loop */ - list *cache_io_queue; /* IO operations queue */ - int cache_flush_delay; /* seconds to wait before flushing keys */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; int sort_alpha; int sort_bypattern; - /* Virtual memory configuration */ - int ds_enabled; /* backend disk in redis.conf */ - char *ds_path; /* location of the disk store on disk */ - unsigned long long cache_max_memory; /* Zip structure config */ size_t hash_max_zipmap_entries; size_t hash_max_zipmap_value; @@ -682,7 +648,7 @@ struct redisCommand { int arity; int flags; /* Use a function to determine keys arguments in a command line. - * Used both for diskstore preloading and Redis Cluster. */ + * Used for Redis Cluster redirect. */ redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ int firstkey; /* The first argument that's a key (0 = no keys) */ @@ -709,27 +675,6 @@ typedef struct _redisSortOperation { robj *pattern; } redisSortOperation; -/* DIsk store threaded I/O request message */ -#define REDIS_IOJOB_LOAD 0 -#define REDIS_IOJOB_SAVE 1 - -typedef struct iojob { - int type; /* Request type, REDIS_IOJOB_* */ - redisDb *db;/* Redis database */ - robj *key; /* This I/O request is about this key */ - robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this - * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */ - time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */ -} iojob; - -/* IO operations scheduled -- check dscache.c for more info */ -typedef struct ioop { - int type; - redisDb *db; - robj *key; - time_t ctime; /* This is the creation time of the entry. */ -} ioop; - /* Structure to hold list iteration abstraction. */ typedef struct { robj *subject; @@ -973,40 +918,6 @@ void oom(const char *msg); void populateCommandTable(void); void resetCommandTableStats(void); -/* Disk store */ -int dsOpen(void); -int dsClose(void); -int dsSet(redisDb *db, robj *key, robj *val, time_t expire); -robj *dsGet(redisDb *db, robj *key, time_t *expire); -int dsDel(redisDb *db, robj *key); -int dsExists(redisDb *db, robj *key); -void dsFlushDb(int dbid); -int dsRdbSaveBackground(char *filename); -int dsRdbSave(char *filename); - -/* Disk Store Cache */ -void dsInit(void); -void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask); -void lockThreadedIO(void); -void unlockThreadedIO(void); -void freeIOJob(iojob *j); -void queueIOJob(iojob *j); -void waitEmptyIOJobsQueue(void); -void processAllPendingIOJobs(void); -int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); -int dontWaitForSwappedKey(redisClient *c, robj *key); -void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); -int cacheFreeOneEntry(void); -void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag); -void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag); -int cacheScheduleIOGetFlags(redisDb *db, robj *key); -void cacheScheduleIO(redisDb *db, robj *key, int type); -void cacheCron(void); -int cacheKeyMayExist(redisDb *db, robj *key); -void cacheSetKeyMayExist(redisDb *db, robj *key); -void cacheSetKeyDoesNotExist(redisDb *db, robj *key); -void cacheForcePointInTime(void); - /* Set data type */ robj *setTypeCreate(robj *value); int setTypeAdd(robj *subject, robj *value); diff --git a/tests/assets/default.conf b/tests/assets/default.conf index 150eb690..75334426 100644 --- a/tests/assets/default.conf +++ b/tests/assets/default.conf @@ -291,32 +291,6 @@ appendfsync everysec # "no" that is the safest pick from the point of view of durability. no-appendfsync-on-rewrite no -#################################### DISK STORE ############################### - -# When disk store is active Redis works as an on-disk database, where memory -# is only used as a object cache. -# -# This mode is good for datasets that are bigger than memory, and in general -# when you want to trade speed for: -# -# - less memory used -# - immediate server restart -# - per key durability, without need for backgrond savig -# -# On the other hand, with disk store enabled MULTI/EXEC are no longer -# transactional from the point of view of the persistence on disk, that is, -# Redis transactions will still guarantee that commands are either processed -# all or nothing, but there is no guarantee that all the keys are flushed -# on disk in an atomic way. -# -# Of course with disk store enabled Redis is not as fast as it is when -# working with just the memory back end. - -diskstore-enabled no -diskstore-path redis.ds -cache-max-memory 0 -cache-flush-delay 0 - ############################### ADVANCED CONFIG ############################### # Hashes are encoded in a special way (much more memory efficient) when they -- 2.47.2