X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/08ee9b570f4a9a181c3ed2242c5b2f6a88eddc41..dda20542abcccac04a7f630695e6a30304f7dcf8:/redis.c diff --git a/redis.c b/redis.c index 38aad0a8..1882d742 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "1.3.8" +#define REDIS_VERSION "2.1.1" #include "fmacros.h" #include "config.h" @@ -37,8 +37,6 @@ #include #include #include -#define __USE_POSIX199309 -#define __USE_UNIX98 #include #ifdef HAVE_BACKTRACE @@ -59,6 +57,7 @@ #include #include #include +#include #include #include @@ -75,7 +74,10 @@ #include "zmalloc.h" /* total memory usage aware version of malloc/free */ #include "lzf.h" /* LZF compression library */ #include "pqsort.h" /* Partial qsort for SORT+LIMIT */ -#include "zipmap.h" +#include "zipmap.h" /* Compact dictionary-alike data structure */ +#include "ziplist.h" /* Compact list data structure */ +#include "sha1.h" /* SHA1 is used for DEBUG DIGEST */ +#include "release.h" /* Release and/or git repository information */ /* Error codes */ #define REDIS_OK 0 @@ -119,17 +121,20 @@ #define REDIS_SET 2 #define REDIS_ZSET 3 #define REDIS_HASH 4 +#define REDIS_VMPOINTER 8 /* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object * is set to one of this fields for this object. */ -#define REDIS_ENCODING_RAW 0 /* Raw representation */ -#define REDIS_ENCODING_INT 1 /* Encoded as integer */ -#define REDIS_ENCODING_ZIPMAP 2 /* Encoded as zipmap */ -#define REDIS_ENCODING_HT 3 /* Encoded as an hash table */ +#define REDIS_ENCODING_RAW 0 /* Raw representation */ +#define REDIS_ENCODING_INT 1 /* Encoded as integer */ +#define REDIS_ENCODING_HT 2 /* Encoded as hash table */ +#define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */ +#define REDIS_ENCODING_LIST 4 /* Encoded as zipmap */ +#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ static char* strencoding[] = { - "raw", "int", "zipmap", "hashtable" + "raw", "int", "hashtable", "zipmap", "list", "ziplist" }; /* Object types only used for dumping to disk */ @@ -189,6 +194,7 @@ static char* strencoding[] = { #define REDIS_MULTI 8 /* This client is in a MULTI context */ #define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */ #define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ +#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */ /* Slave replication state - slave side */ #define REDIS_REPL_NONE 0 /* No active replication */ @@ -231,9 +237,11 @@ static char* strencoding[] = { #define APPENDFSYNC_ALWAYS 1 #define APPENDFSYNC_EVERYSEC 2 -/* Hashes related defaults */ +/* Zip structure related defaults */ #define REDIS_HASH_MAX_ZIPMAP_ENTRIES 64 #define REDIS_HASH_MAX_ZIPMAP_VALUE 512 +#define REDIS_LIST_MAX_ZIPLIST_ENTRIES 1024 +#define REDIS_LIST_MAX_ZIPLIST_VALUE 32 /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) @@ -245,30 +253,41 @@ static void _redisPanic(char *msg, char *file, int line); /* A redis object, that is a type able to hold a string / list / set */ -/* The VM object structure */ -struct redisObjectVM { - off_t page; /* the page at witch the object is stored on disk */ - off_t usedpages; /* number of pages used on disk */ - time_t atime; /* Last access time */ -} vm; - /* The actual Redis Object */ typedef struct redisObject { - void *ptr; - unsigned char type; - unsigned char encoding; - unsigned char storage; /* If this object is a key, where is the value? - * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */ - unsigned char vtype; /* If this object is a key, and value is swapped out, - * this is the type of the swapped out object. */ + unsigned type:4; + unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */ + unsigned encoding:4; + unsigned lru:22; /* lru time (relative to server.lruclock) */ int refcount; + void *ptr; /* VM fields, this are only allocated if VM is active, otherwise the * object allocation function will just allocate * sizeof(redisObjct) minus sizeof(redisObjectVM), so using * Redis without VM active will not have any overhead. */ - struct redisObjectVM vm; } robj; +/* The VM pointer structure - identifies an object in the swap file. + * + * This object is stored in place of the value + * object in the main key->value hash table representing a database. + * Note that the first fields (type, storage) are the same as the redisObject + * structure so that vmPointer strucuters can be accessed even when casted + * as redisObject structures. + * + * This is useful as we don't know if a value object is or not on disk, but we + * are always able to read obj->storage to check this. For vmPointer + * structures "type" is set to REDIS_VMPOINTER (even if without this field + * is still possible to check the kind of object from the value of 'storage').*/ +typedef struct vmPointer { + unsigned type:4; + unsigned storage:2; /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */ + unsigned notused:26; + unsigned int vtype; /* type of the object stored in the swap file */ + off_t page; /* the page at witch the object is stored on disk */ + off_t usedpages; /* number of pages used on disk */ +} vmpointer; + /* Macro used to initalize a Redis object allocated on the stack. * Note that this macro is taken near the structure definition to make sure * we'll update it when the structure is changed, to avoid bugs like @@ -278,14 +297,15 @@ typedef struct redisObject { _var.type = REDIS_STRING; \ _var.encoding = REDIS_ENCODING_RAW; \ _var.ptr = _ptr; \ - if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \ + _var.storage = REDIS_VM_MEMORY; \ } while(0); typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ - dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */ + dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ dict *io_keys; /* Keys with clients waiting for VM I/O */ + dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@ -323,13 +343,14 @@ typedef struct redisClient { long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - robj **blockingkeys; /* The key we are waiting to terminate a blocking + robj **blocking_keys; /* The key we are waiting to terminate a blocking * operation such as BLPOP. Otherwise NULL. */ - int blockingkeysnum; /* Number of blocking keys */ + int blocking_keys_num; /* Number of blocking keys */ time_t blockingto; /* Blocking operation timeout. If UNIX current time * is >= blockingto then the operation timed out. */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ + list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ } redisClient; @@ -365,6 +386,8 @@ struct redisServer { int daemonize; int appendonly; int appendfsync; + int no_appendfsync_on_rewrite; + int shutdown_asap; time_t lastfsync; int appendfd; int appendseldb; @@ -372,6 +395,7 @@ struct redisServer { pid_t bgsavechildpid; pid_t bgrewritechildpid; sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ + sds aofbuf; /* AOF buffer, written before entering the event loop */ struct saveparam *saveparams; int saveparamslen; char *logfile; @@ -379,7 +403,6 @@ struct redisServer { char *dbfilename; char *appendfilename; char *requirepass; - int shareobjects; int rdbcompression; int activerehashing; /* Replication related */ @@ -404,9 +427,11 @@ struct redisServer { off_t vm_page_size; off_t vm_pages; unsigned long long vm_max_memory; - /* Hashes config */ + /* Zip structure config */ size_t hash_max_zipmap_entries; size_t hash_max_zipmap_value; + size_t list_max_ziplist_entries; + size_t list_max_ziplist_value; /* Virtual memory state */ FILE *vm_fp; int vm_fd; @@ -444,6 +469,8 @@ struct redisServer { list *pubsub_patterns; /* A list of pubsub_patterns */ /* Misc */ FILE *devnull; + unsigned lruclock:22; /* clock incrementing every minute, for LRU */ + unsigned lruclock_padding:10; }; typedef struct pubsubPattern { @@ -452,6 +479,7 @@ typedef struct pubsubPattern { } pubsubPattern; typedef void redisCommandProc(redisClient *c); +typedef void redisVmPreloadProc(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); struct redisCommand { char *name; redisCommandProc *proc; @@ -460,7 +488,7 @@ struct redisCommand { /* Use a function to determine which keys need to be loaded * in the background prior to executing this command. Takes precedence * over vm_firstkey and others, ignored when NULL */ - redisCommandProc *vm_preload_proc; + redisVmPreloadProc *vm_preload_proc; /* What keys should be loaded in background when calling this command? */ int vm_firstkey; /* The first argument that's a key (0 = no keys) */ int vm_lastkey; /* THe last argument that's a key */ @@ -516,8 +544,9 @@ struct sharedObjectsStruct { *outofrangeerr, *plus, *select0, *select1, *select2, *select3, *select4, *select5, *select6, *select7, *select8, *select9, - *messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3, - *psubscribebulk, *punsubscribebulk, *integers[REDIS_SHARED_INTEGERS]; + *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3, + *mbulk4, *psubscribebulk, *punsubscribebulk, + *integers[REDIS_SHARED_INTEGERS]; } shared; /* Global vars that are actally used as constants. The following double @@ -534,6 +563,9 @@ typedef struct iojob { int type; /* Request type, REDIS_IOJOB_* */ redisDb *db;/* Redis database */ robj *key; /* This I/O request is about swapping this key */ + robj *id; /* Unique identifier of this job: + this is the object to swap for REDIS_IOREQ_*_SWAP, or the + vmpointer objct for REDIS_IOREQ_LOAD. */ robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */ off_t page; /* Swap page where to read/write the object */ @@ -558,6 +590,8 @@ static int rdbSaveBackground(char *filename); static robj *createStringObject(char *ptr, size_t len); static robj *dupStringObject(robj *o); static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); +static void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc); +static void flushAppendOnlyFile(void); static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc); static int syncWithMaster(void); static robj *tryObjectEncoding(robj *o); @@ -565,8 +599,7 @@ static robj *getDecodedObject(robj *o); static int removeExpire(redisDb *db, robj *key); static int expireIfNeeded(redisDb *db, robj *key); static int deleteIfVolatile(redisDb *db, robj *key); -static int deleteIfSwapped(redisDb *db, robj *key); -static int deleteKey(redisDb *db, robj *key); +static int dbDelete(redisDb *db, robj *key); static time_t getExpire(redisDb *db, robj *key); static int setExpire(redisDb *db, robj *key, time_t when); static void updateSlavesWaitingBgsave(int bgsaveerr); @@ -588,8 +621,8 @@ static void unblockClientWaitingData(redisClient *c); static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele); static void vmInit(void); static void vmMarkPagesFree(off_t page, off_t count); -static robj *vmLoadObject(robj *key); -static robj *vmPreviewObject(robj *key); +static robj *vmLoadObject(robj *o); +static robj *vmPreviewObject(robj *o); static int vmSwapOneObjectBlocking(void); static int vmSwapOneObjectThreaded(void); static int vmCanSwapOut(void); @@ -607,8 +640,9 @@ static robj *vmReadObjectFromSwap(off_t page, int type); static void waitEmptyIOJobsQueue(void); static void vmReopenSwapFile(void); static int vmFreePage(off_t page); -static void zunionInterBlockClientOnSwappedKeys(redisClient *c); -static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c); +static void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); +static void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); +static int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); static int dontWaitForSwappedKey(redisClient *c, robj *key); static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); @@ -616,18 +650,27 @@ static struct redisCommand *lookupCommand(char *name); static void call(redisClient *c, struct redisCommand *cmd); static void resetClient(redisClient *c); static void convertToRealHash(robj *o); +static void listTypeConvert(robj *o, int enc); static int pubsubUnsubscribeAllChannels(redisClient *c, int notify); static int pubsubUnsubscribeAllPatterns(redisClient *c, int notify); static void freePubsubPattern(void *p); static int listMatchPubsubPattern(void *a, void *b); static int compareStringObjects(robj *a, robj *b); +static int equalStringObjects(robj *a, robj *b); static void usage(); +static int rewriteAppendOnlyFileBackground(void); +static vmpointer *vmSwapObjectBlocking(robj *val); +static int prepareForShutdown(); +static void touchWatchedKey(redisDb *db, robj *key); +static void touchWatchedKeysOnFlush(int dbid); +static void unwatchAllKeys(redisClient *c); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); static void echoCommand(redisClient *c); static void setCommand(redisClient *c); static void setnxCommand(redisClient *c); +static void setexCommand(redisClient *c); static void getCommand(redisClient *c); static void delCommand(redisClient *c); static void existsCommand(redisClient *c); @@ -714,8 +757,8 @@ static void hmgetCommand(redisClient *c); static void hdelCommand(redisClient *c); static void hlenCommand(redisClient *c); static void zremrangebyrankCommand(redisClient *c); -static void zunionCommand(redisClient *c); -static void zinterCommand(redisClient *c); +static void zunionstoreCommand(redisClient *c); +static void zinterstoreCommand(redisClient *c); static void hkeysCommand(redisClient *c); static void hvalsCommand(redisClient *c); static void hgetallCommand(redisClient *c); @@ -727,15 +770,19 @@ static void unsubscribeCommand(redisClient *c); static void psubscribeCommand(redisClient *c); static void punsubscribeCommand(redisClient *c); static void publishCommand(redisClient *c); +static void watchCommand(redisClient *c); +static void unwatchCommand(redisClient *c); /*================================= Globals ================================= */ /* Global vars */ static struct redisServer server; /* server global state */ -static struct redisCommand cmdTable[] = { +static struct redisCommand *commandTable; +static struct redisCommand readonlyCommandTable[] = { {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, + {"setex",setexCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, {"append",appendCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, {"substr",substrCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, {"del",delCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, @@ -775,8 +822,8 @@ static struct redisCommand cmdTable[] = { {"zrem",zremCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zremrangebyrank",zremrangebyrankCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, - {"zunion",zunionCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, - {"zinter",zinterCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, + {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, + {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zcount",zcountCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, @@ -821,7 +868,7 @@ static struct redisCommand cmdTable[] = { {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"type",typeCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"multi",multiCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"exec",execCommand,1,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,0,0,0}, + {"exec",execCommand,1,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,execBlockClientOnSwappedKeys,0,0,0}, {"discard",discardCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"sync",syncCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, @@ -838,7 +885,8 @@ static struct redisCommand cmdTable[] = { {"psubscribe",psubscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, {"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0}, {"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0}, - {NULL,NULL,0,0,NULL,0,0,0} + {"watch",watchCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, + {"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0} }; /*============================ Utility functions ============================ */ @@ -970,6 +1018,77 @@ static int stringmatch(const char *pattern, const char *string, int nocase) { return stringmatchlen(pattern,strlen(pattern),string,strlen(string),nocase); } +/* Convert a string representing an amount of memory into the number of + * bytes, so for instance memtoll("1Gi") will return 1073741824 that is + * (1024*1024*1024). + * + * On parsing error, if *err is not NULL, it's set to 1, otherwise it's + * set to 0 */ +static long long memtoll(const char *p, int *err) { + const char *u; + char buf[128]; + long mul; /* unit multiplier */ + long long val; + unsigned int digits; + + if (err) *err = 0; + /* Search the first non digit character. */ + u = p; + if (*u == '-') u++; + while(*u && isdigit(*u)) u++; + if (*u == '\0' || !strcasecmp(u,"b")) { + mul = 1; + } else if (!strcasecmp(u,"k")) { + mul = 1000; + } else if (!strcasecmp(u,"kb")) { + mul = 1024; + } else if (!strcasecmp(u,"m")) { + mul = 1000*1000; + } else if (!strcasecmp(u,"mb")) { + mul = 1024*1024; + } else if (!strcasecmp(u,"g")) { + mul = 1000L*1000*1000; + } else if (!strcasecmp(u,"gb")) { + mul = 1024L*1024*1024; + } else { + if (err) *err = 1; + mul = 1; + } + digits = u-p; + if (digits >= sizeof(buf)) { + if (err) *err = 1; + return LLONG_MAX; + } + memcpy(buf,p,digits); + buf[digits] = '\0'; + val = strtoll(buf,NULL,10); + return val*mul; +} + +/* Convert a long long into a string. Returns the number of + * characters needed to represent the number, that can be shorter if passed + * buffer length is not enough to store the whole number. */ +static int ll2string(char *s, size_t len, long long value) { + char buf[32], *p; + unsigned long long v; + size_t l; + + if (len == 0) return 0; + v = (value < 0) ? -value : value; + p = buf+31; /* point to the last character */ + do { + *p-- = '0'+(v%10); + v /= 10; + } while(v); + if (value < 0) *p-- = '-'; + p++; + l = 32-(p-buf); + if (l+1 > len) l = len-1; /* Make sure it fits, including the nul term */ + memcpy(s,p,l); + s[l] = '\0'; + return l; +} + static void redisLog(int level, const char *fmt, ...) { va_list ap; FILE *fp; @@ -1013,7 +1132,7 @@ static void dictListDestructor(void *privdata, void *val) listRelease((list*)val); } -static int sdsDictKeyCompare(void *privdata, const void *key1, +static int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2) { int l1,l2; @@ -1033,11 +1152,18 @@ static void dictRedisObjectDestructor(void *privdata, void *val) decrRefCount(val); } +static void dictSdsDestructor(void *privdata, void *val) +{ + DICT_NOTUSED(privdata); + + sdsfree(val); +} + static int dictObjKeyCompare(void *privdata, const void *key1, const void *key2) { const robj *o1 = key1, *o2 = key2; - return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr); + return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); } static unsigned int dictObjHash(const void *key) { @@ -1045,6 +1171,10 @@ static unsigned int dictObjHash(const void *key) { return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); } +static unsigned int dictSdsHash(const void *key) { + return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); +} + static int dictEncObjKeyCompare(void *privdata, const void *key1, const void *key2) { @@ -1052,12 +1182,12 @@ static int dictEncObjKeyCompare(void *privdata, const void *key1, int cmp; if (o1->encoding == REDIS_ENCODING_INT && - o2->encoding == REDIS_ENCODING_INT && - o1->ptr == o2->ptr) return 1; + o2->encoding == REDIS_ENCODING_INT) + return o1->ptr == o2->ptr; o1 = getDecodedObject(o1); o2 = getDecodedObject(o2); - cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr); + cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); decrRefCount(o1); decrRefCount(o2); return cmp; @@ -1073,7 +1203,7 @@ static unsigned int dictEncObjHash(const void *key) { char buf[32]; int len; - len = snprintf(buf,32,"%ld",(long)o->ptr); + len = ll2string(buf,32,(long)o->ptr); return dictGenHashFunction((unsigned char*)buf, len); } else { unsigned int hash; @@ -1086,7 +1216,7 @@ static unsigned int dictEncObjHash(const void *key) { } } -/* Sets type and expires */ +/* Sets type */ static dictType setDictType = { dictEncObjHash, /* hash function */ NULL, /* key dup */ @@ -1106,23 +1236,23 @@ static dictType zsetDictType = { dictVanillaFree /* val destructor of malloc(sizeof(double)) */ }; -/* Db->dict */ +/* Db->dict, keys are sds strings, vals are Redis objects. */ static dictType dbDictType = { - dictObjHash, /* hash function */ + dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictObjKeyCompare, /* key compare */ - dictRedisObjectDestructor, /* key destructor */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ dictRedisObjectDestructor /* val destructor */ }; /* Db->expires */ static dictType keyptrDictType = { - dictObjHash, /* hash function */ + dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictObjKeyCompare, /* key compare */ - dictRedisObjectDestructor, /* key destructor */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; @@ -1291,7 +1421,7 @@ void backgroundRewriteDoneHandler(int statloc) { /* If append only is actually enabled... */ close(server.appendfd); server.appendfd = fd; - fsync(fd); + if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd); server.appendseldb = -1; /* Make sure it will issue SELECT */ redisLog(REDIS_NOTICE,"The new append only file was selected for future appends."); } else { @@ -1337,6 +1467,26 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD * in objects at every object access, and accuracy is not needed. * To access a global var is faster than calling time(NULL) */ server.unixtime = time(NULL); + /* We have just 21 bits per object for LRU information. + * So we use an (eventually wrapping) LRU clock with minutes resolution. + * + * When we need to select what object to swap, we compute the minimum + * time distance between the current lruclock and the object last access + * lruclock info. Even if clocks will wrap on overflow, there is + * the interesting property that we are sure that at least + * ABS(A-B) minutes passed between current time and timestamp B. + * + * This is not precise but we don't need at all precision, but just + * something statistically reasonable. + */ + server.lruclock = (time(NULL)/60)&((1<<21)-1); + + /* We received a SIGTERM, shutting down here in a safe way, as it is + * not ok doing so inside the signal handler. */ + if (server.shutdown_asap) { + if (prepareForShutdown() == REDIS_OK) exit(0); + redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); + } /* Show some info about non-empty databases */ for (j = 0; j < server.dbnum; j++) { @@ -1428,7 +1578,11 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD if ((de = dictGetRandomKey(db->expires)) == NULL) break; t = (time_t) dictGetEntryVal(de); if (now > t) { - deleteKey(db,dictGetEntryKey(de)); + sds key = dictGetEntryKey(de); + robj *keyobj = createStringObject(key,sdslen(key)); + + dbDelete(db,keyobj); + decrRefCount(keyobj); expired++; server.stat_expiredkeys++; } @@ -1467,6 +1621,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD redisLog(REDIS_NOTICE,"Connecting to MASTER..."); if (syncWithMaster() == REDIS_OK) { redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded"); + if (server.appendonly) rewriteAppendOnlyFileBackground(); } } return 100; @@ -1478,6 +1633,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD static void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); + /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; listNode *ln; @@ -1502,6 +1658,8 @@ static void beforeSleep(struct aeEventLoop *eventLoop) { processInputBuffer(c); } } + /* Write the AOF buffer on disk */ + flushAppendOnlyFile(); } static void createSharedObjects(void) { @@ -1542,11 +1700,13 @@ static void createSharedObjects(void) { shared.select8 = createStringObject("select 8\r\n",10); shared.select9 = createStringObject("select 9\r\n",10); shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13); + shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14); shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15); shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); shared.mbulk3 = createStringObject("*3\r\n",4); + shared.mbulk4 = createStringObject("*4\r\n",4); for (j = 0; j < REDIS_SHARED_INTEGERS; j++) { shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j); shared.integers[j]->encoding = REDIS_ENCODING_INT; @@ -1577,7 +1737,8 @@ static void initServerConfig() { server.glueoutputbuf = 1; server.daemonize = 0; server.appendonly = 0; - server.appendfsync = APPENDFSYNC_ALWAYS; + server.appendfsync = APPENDFSYNC_EVERYSEC; + server.no_appendfsync_on_rewrite = 0; server.lastfsync = time(NULL); server.appendfd = -1; server.appendseldb = -1; /* Make sure the first time will not match */ @@ -1585,7 +1746,6 @@ static void initServerConfig() { server.dbfilename = zstrdup("dump.rdb"); server.appendfilename = zstrdup("appendonly.aof"); server.requirepass = NULL; - server.shareobjects = 0; server.rdbcompression = 1; server.activerehashing = 1; server.maxclients = 0; @@ -1600,6 +1760,9 @@ static void initServerConfig() { server.vm_blocked_clients = 0; server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES; server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE; + server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES; + server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE; + server.shutdown_asap = 0; resetServerSaveParams(); @@ -1648,7 +1811,8 @@ static void initServer() { for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); - server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL); + server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); + server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); if (server.vm_enabled) server.db[j].io_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; @@ -1661,6 +1825,7 @@ static void initServer() { server.bgsavechildpid = -1; server.bgrewritechildpid = -1; server.bgrewritebuf = sdsempty(); + server.aofbuf = sdsempty(); server.lastsave = time(NULL); server.dirty = 0; server.stat_numcommands = 0; @@ -1802,7 +1967,7 @@ static void loadServerConfig(char *filename) { } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) { server.maxclients = atoi(argv[1]); } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) { - server.maxmemory = strtoll(argv[1], NULL, 10); + server.maxmemory = memtoll(argv[1],NULL); } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) { server.masterhost = sdsnew(argv[1]); server.masterport = atoi(argv[2]); @@ -1813,10 +1978,6 @@ static void loadServerConfig(char *filename) { if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) { - if ((server.shareobjects = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) { if ((server.rdbcompression = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -1833,6 +1994,14 @@ static void loadServerConfig(char *filename) { if ((server.appendonly = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) { + zfree(server.appendfilename); + server.appendfilename = zstrdup(argv[1]); + } else if (!strcasecmp(argv[0],"no-appendfsync-on-rewrite") + && argc == 2) { + if ((server.no_appendfsync_on_rewrite= yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) { if (!strcasecmp(argv[1],"no")) { server.appendfsync = APPENDFSYNC_NO; @@ -1860,19 +2029,21 @@ static void loadServerConfig(char *filename) { zfree(server.vm_swap_file); server.vm_swap_file = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) { - server.vm_max_memory = strtoll(argv[1], NULL, 10); + server.vm_max_memory = memtoll(argv[1],NULL); } else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) { - server.vm_page_size = strtoll(argv[1], NULL, 10); + server.vm_page_size = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) { - server.vm_pages = strtoll(argv[1], NULL, 10); + server.vm_pages = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) { server.vm_max_threads = strtoll(argv[1], NULL, 10); } else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2){ - server.hash_max_zipmap_entries = strtol(argv[1], NULL, 10); + server.hash_max_zipmap_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2){ - server.hash_max_zipmap_value = strtol(argv[1], NULL, 10); - } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) { - server.vm_max_threads = strtoll(argv[1], NULL, 10); + server.hash_max_zipmap_value = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){ + server.list_max_ziplist_entries = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2){ + server.list_max_ziplist_value = memtoll(argv[1], NULL); } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -1916,6 +2087,9 @@ static void freeClient(redisClient *c) { if (c->flags & REDIS_BLOCKED) unblockClientWaitingData(c); + /* UNWATCH all the keys */ + unwatchAllKeys(c); + listRelease(c->watched_keys); /* Unsubscribe from all the pubsub channels */ pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeAllPatterns(c,0); @@ -1931,7 +2105,8 @@ static void freeClient(redisClient *c) { ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); - /* Remove from the list of clients waiting for swapped keys */ + /* Remove from the list of clients that are now ready to be restarted + * after waiting for swapped keys */ if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) { ln = listSearchKey(server.io_ready_clients,c); if (ln) { @@ -1939,6 +2114,7 @@ static void freeClient(redisClient *c) { server.vm_blocked_clients--; } } + /* Remove from the list of clients waiting for swapped keys */ while (server.vm_enabled && listLength(c->io_keys)) { ln = listFirst(c->io_keys); dontWaitForSwappedKey(c,ln->value); @@ -2136,13 +2312,29 @@ static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int } } +static int qsortRedisCommands(const void *r1, const void *r2) { + return strcasecmp( + ((struct redisCommand*)r1)->name, + ((struct redisCommand*)r2)->name); +} + +static void sortCommandTable() { + /* Copy and sort the read-only version of the command table */ + commandTable = (struct redisCommand*)malloc(sizeof(readonlyCommandTable)); + memcpy(commandTable,readonlyCommandTable,sizeof(readonlyCommandTable)); + qsort(commandTable, + sizeof(readonlyCommandTable)/sizeof(struct redisCommand), + sizeof(struct redisCommand),qsortRedisCommands); +} + static struct redisCommand *lookupCommand(char *name) { - int j = 0; - while(cmdTable[j].name != NULL) { - if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j]; - j++; - } - return NULL; + struct redisCommand tmp = {name,NULL,0,0,NULL,0,0,0}; + return bsearch( + &tmp, + commandTable, + sizeof(readonlyCommandTable)/sizeof(struct redisCommand), + sizeof(struct redisCommand), + qsortRedisCommands); } /* resetClient prepare the client to process the next command */ @@ -2166,7 +2358,7 @@ static void call(redisClient *c, struct redisCommand *cmd) { listLength(server.slaves)) replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc); if (listLength(server.monitors)) - replicationFeedSlaves(server.monitors,c->db->id,c->argv,c->argc); + replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc); server.stat_numcommands++; } @@ -2335,12 +2527,15 @@ static int processCommand(redisClient *c) { } /* Exec the command */ - if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) { + if (c->flags & REDIS_MULTI && + cmd->proc != execCommand && cmd->proc != discardCommand && + cmd->proc != multiCommand && cmd->proc != watchCommand) + { queueMultiCommand(c,cmd); addReply(c,shared.queued); } else { if (server.vm_enabled && server.vm_max_threads > 0 && - blockClientOnSwappedKeys(cmd,c)) return 1; + blockClientOnSwappedKeys(c,cmd)) return 1; call(c,cmd); } @@ -2421,6 +2616,64 @@ static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int arg if (outv != static_outv) zfree(outv); } +static sds sdscatrepr(sds s, char *p, size_t len) { + s = sdscatlen(s,"\"",1); + while(len--) { + switch(*p) { + case '\\': + case '"': + s = sdscatprintf(s,"\\%c",*p); + break; + case '\n': s = sdscatlen(s,"\\n",1); break; + case '\r': s = sdscatlen(s,"\\r",1); break; + case '\t': s = sdscatlen(s,"\\t",1); break; + case '\a': s = sdscatlen(s,"\\a",1); break; + case '\b': s = sdscatlen(s,"\\b",1); break; + default: + if (isprint(*p)) + s = sdscatprintf(s,"%c",*p); + else + s = sdscatprintf(s,"\\x%02x",(unsigned char)*p); + break; + } + p++; + } + return sdscatlen(s,"\"",1); +} + +static void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) { + listNode *ln; + listIter li; + int j; + sds cmdrepr = sdsnew("+"); + robj *cmdobj; + struct timeval tv; + + gettimeofday(&tv,NULL); + cmdrepr = sdscatprintf(cmdrepr,"%ld.%ld ",(long)tv.tv_sec,(long)tv.tv_usec); + if (dictid != 0) cmdrepr = sdscatprintf(cmdrepr,"(db %d) ", dictid); + + for (j = 0; j < argc; j++) { + if (argv[j]->encoding == REDIS_ENCODING_INT) { + cmdrepr = sdscatprintf(cmdrepr, "%ld", (long)argv[j]->ptr); + } else { + cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, + sdslen(argv[j]->ptr)); + } + if (j != argc-1) + cmdrepr = sdscatlen(cmdrepr," ",1); + } + cmdrepr = sdscatlen(cmdrepr,"\r\n",2); + cmdobj = createObject(REDIS_STRING,cmdrepr); + + listRewind(monitors,&li); + while((ln = listNext(&li))) { + redisClient *monitor = ln->value; + addReply(monitor,cmdobj); + } + decrRefCount(cmdobj); +} + static void processInputBuffer(redisClient *c) { again: /* Before to process the input buffer, make sure the client is not @@ -2546,7 +2799,7 @@ static void *dupClientReplyValue(void *o) { } static int listMatchObjects(void *a, void *b) { - return compareStringObjects(a,b) == 0; + return equalStringObjects(a,b); } static redisClient *createClient(int fd) { @@ -2572,9 +2825,10 @@ static redisClient *createClient(int fd) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->blockingkeys = NULL; - c->blockingkeysnum = 0; + c->blocking_keys = NULL; + c->blocking_keys_num = 0; c->io_keys = listCreate(); + c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); @@ -2618,21 +2872,6 @@ static void addReplyDouble(redisClient *c, double d) { (unsigned long) strlen(buf),buf)); } -static void addReplyLong(redisClient *c, long l) { - char buf[128]; - size_t len; - - if (l == 0) { - addReply(c,shared.czero); - return; - } else if (l == 1) { - addReply(c,shared.cone); - return; - } - len = snprintf(buf,sizeof(buf),":%ld\r\n",l); - addReplySds(c,sdsnewlen(buf,len)); -} - static void addReplyLongLong(redisClient *c, long long ll) { char buf[128]; size_t len; @@ -2644,8 +2883,11 @@ static void addReplyLongLong(redisClient *c, long long ll) { addReply(c,shared.cone); return; } - len = snprintf(buf,sizeof(buf),":%lld\r\n",ll); - addReplySds(c,sdsnewlen(buf,len)); + buf[0] = ':'; + len = ll2string(buf+1,sizeof(buf)-1,ll); + buf[len+1] = '\r'; + buf[len+2] = '\n'; + addReplySds(c,sdsnewlen(buf,len+3)); } static void addReplyUlong(redisClient *c, unsigned long ul) { @@ -2664,7 +2906,8 @@ static void addReplyUlong(redisClient *c, unsigned long ul) { } static void addReplyBulkLen(redisClient *c, robj *obj) { - size_t len; + size_t len, intlen; + char buf[128]; if (obj->encoding == REDIS_ENCODING_RAW) { len = sdslen(obj->ptr); @@ -2681,7 +2924,11 @@ static void addReplyBulkLen(redisClient *c, robj *obj) { len++; } } - addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len)); + buf[0] = '$'; + intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len); + buf[intlen+1] = '\r'; + buf[intlen+2] = '\n'; + addReplySds(c,sdsnewlen(buf,intlen+3)); } static void addReplyBulk(redisClient *c, robj *obj) { @@ -2690,6 +2937,12 @@ static void addReplyBulk(redisClient *c, robj *obj) { addReply(c,shared.crlf); } +static void addReplyBulkSds(redisClient *c, sds s) { + robj *o = createStringObject(s, sdslen(s)); + addReplyBulk(c,o); + decrRefCount(o); +} + /* In the CONFIG command we need to add vanilla C string as bulk replies */ static void addReplyBulkCString(redisClient *c, char *s) { if (s == NULL) { @@ -2749,12 +3002,9 @@ static robj *createObject(int type, void *ptr) { listDelNode(server.objfreelist,head); if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex); } else { - if (server.vm_enabled) { + if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex); - o = zmalloc(sizeof(*o)); - } else { - o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM)); - } + o = zmalloc(sizeof(*o)); } o->type = type; o->encoding = REDIS_ENCODING_RAW; @@ -2762,10 +3012,10 @@ static robj *createObject(int type, void *ptr) { o->refcount = 1; if (server.vm_enabled) { /* Note that this code may run in the context of an I/O thread - * and accessing to server.unixtime in theory is an error + * and accessing server.lruclock in theory is an error * (no locks). But in practice this is safe, and even if we read - * garbage Redis will not fail, as it's just a statistical info */ - o->vm.atime = server.unixtime; + * garbage Redis will not fail. */ + o->lru = server.lruclock; o->storage = REDIS_VM_MEMORY; } return o; @@ -2781,12 +3031,12 @@ static robj *createStringObjectFromLongLong(long long value) { incrRefCount(shared.integers[value]); o = shared.integers[value]; } else { - o = createObject(REDIS_STRING, NULL); if (value >= LONG_MIN && value <= LONG_MAX) { + o = createObject(REDIS_STRING, NULL); o->encoding = REDIS_ENCODING_INT; o->ptr = (void*)((long)value); } else { - o->ptr = sdscatprintf(sdsempty(),"%lld",value); + o = createObject(REDIS_STRING,sdsfromlonglong(value)); } } return o; @@ -2799,9 +3049,17 @@ static robj *dupStringObject(robj *o) { static robj *createListObject(void) { list *l = listCreate(); - + robj *o = createObject(REDIS_LIST,l); listSetFreeMethod(l,decrRefCount); - return createObject(REDIS_LIST,l); + o->encoding = REDIS_ENCODING_LIST; + return o; +} + +static robj *createZiplistObject(void) { + unsigned char *zl = ziplistNew(); + robj *o = createObject(REDIS_LIST,zl); + o->encoding = REDIS_ENCODING_ZIPLIST; + return o; } static robj *createSetObject(void) { @@ -2834,7 +3092,16 @@ static void freeStringObject(robj *o) { } static void freeListObject(robj *o) { - listRelease((list*) o->ptr); + switch (o->encoding) { + case REDIS_ENCODING_LIST: + listRelease((list*) o->ptr); + break; + case REDIS_ENCODING_ZIPLIST: + zfree(o->ptr); + break; + default: + redisPanic("Unknown list encoding type"); + } } static void freeSetObject(robj *o) { @@ -2870,28 +3137,31 @@ static void incrRefCount(robj *o) { static void decrRefCount(void *obj) { robj *o = obj; - if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0"); - /* Object is a key of a swapped out value, or in the process of being - * loaded. */ + /* Object is a swapped out value, or in the process of being loaded. */ if (server.vm_enabled && (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING)) { - if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(obj); - redisAssert(o->type == REDIS_STRING); - freeStringObject(o); - vmMarkPagesFree(o->vm.page,o->vm.usedpages); - pthread_mutex_lock(&server.obj_freelist_mutex); - if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX || - !listAddNodeHead(server.objfreelist,o)) - zfree(o); - pthread_mutex_unlock(&server.obj_freelist_mutex); + vmpointer *vp = obj; + if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(o); + vmMarkPagesFree(vp->page,vp->usedpages); server.vm_stats_swapped_objects--; + zfree(vp); return; } - /* Object is in memory, or in the process of being swapped out. */ + + if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0"); + /* Object is in memory, or in the process of being swapped out. + * + * If the object is being swapped out, abort the operation on + * decrRefCount even if the refcount does not drop to 0: the object + * is referenced at least two times, as value of the key AND as + * job->val in the iojob. So if we don't invalidate the iojob, when it is + * done but the relevant key was removed in the meantime, the + * complete jobs handler will not find the key about the job and the + * assert will fail. */ + if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING) + vmCancelThreadedIOJob(o); if (--(o->refcount) == 0) { - if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING) - vmCancelThreadedIOJob(obj); switch(o->type) { case REDIS_STRING: freeStringObject(o); break; case REDIS_LIST: freeListObject(o); break; @@ -2908,63 +3178,6 @@ static void decrRefCount(void *obj) { } } -static robj *lookupKey(redisDb *db, robj *key) { - dictEntry *de = dictFind(db->dict,key); - if (de) { - robj *key = dictGetEntryKey(de); - robj *val = dictGetEntryVal(de); - - if (server.vm_enabled) { - if (key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) - { - /* If we were swapping the object out, stop it, this key - * was requested. */ - if (key->storage == REDIS_VM_SWAPPING) - vmCancelThreadedIOJob(key); - /* Update the access time of the key for the aging algorithm. */ - key->vm.atime = server.unixtime; - } else { - int notify = (key->storage == REDIS_VM_LOADING); - - /* Our value was swapped on disk. Bring it at home. */ - redisAssert(val == NULL); - val = vmLoadObject(key); - dictGetEntryVal(de) = val; - - /* Clients blocked by the VM subsystem may be waiting for - * this key... */ - if (notify) handleClientsBlockedOnSwappedKey(db,key); - } - } - return val; - } else { - return NULL; - } -} - -static robj *lookupKeyRead(redisDb *db, robj *key) { - expireIfNeeded(db,key); - return lookupKey(db,key); -} - -static robj *lookupKeyWrite(redisDb *db, robj *key) { - deleteIfVolatile(db,key); - return lookupKey(db,key); -} - -static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) { - robj *o = lookupKeyRead(c->db, key); - if (!o) addReply(c,reply); - return o; -} - -static robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) { - robj *o = lookupKeyWrite(c->db, key); - if (!o) addReply(c,reply); - return o; -} - static int checkType(redisClient *c, robj *o, int type) { if (o->type != type) { addReply(c,shared.wrongtypeerr); @@ -2973,21 +3186,6 @@ static int checkType(redisClient *c, robj *o, int type) { return 0; } -static int deleteKey(redisDb *db, robj *key) { - int retval; - - /* We need to protect key from destruction: after the first dictDelete() - * it may happen that 'key' is no longer valid if we don't increment - * it's count. This may happen when we get the object reference directly - * from the hash table with dictRandomKey() or dict iterators */ - incrRefCount(key); - if (dictSize(db->expires)) dictDelete(db->expires,key); - retval = dictDelete(db->dict,key); - decrRefCount(key); - - return retval == DICT_OK; -} - /* Check if the nul-terminated string 's' can be represented by a long * (that is, is a number that fits into long without any other space or * character before or after the digits). @@ -3001,7 +3199,7 @@ static int isStringRepresentableAsLong(sds s, long *longval) { value = strtol(s, &endptr, 10); if (endptr[0] != '\0') return REDIS_ERR; - slen = snprintf(buf,32,"%ld",value); + slen = ll2string(buf,32,value); /* If the number converted back into a string is not identical * then it's not possible to encode the string as integer */ @@ -3054,7 +3252,7 @@ static robj *getDecodedObject(robj *o) { if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) { char buf[32]; - snprintf(buf,32,"%ld",(long)o->ptr); + ll2string(buf,32,(long)o->ptr); dec = createStringObject(buf,strlen(buf)); return dec; } else { @@ -3064,7 +3262,7 @@ static robj *getDecodedObject(robj *o) { /* Compare two string objects via strcmp() or alike. * Note that the objects may be integer-encoded. In such a case we - * use snprintf() to get a string representation of the numbers on the stack + * use ll2string() to get a string representation of the numbers on the stack * and compare the strings, it's much faster than calling getDecodedObject(). * * Important note: if objects are not integer encoded, but binary-safe strings, @@ -3077,14 +3275,14 @@ static int compareStringObjects(robj *a, robj *b) { if (a == b) return 0; if (a->encoding != REDIS_ENCODING_RAW) { - snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr); + ll2string(bufa,sizeof(bufa),(long) a->ptr); astr = bufa; bothsds = 0; } else { astr = a->ptr; } if (b->encoding != REDIS_ENCODING_RAW) { - snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr); + ll2string(bufb,sizeof(bufb),(long) b->ptr); bstr = bufb; bothsds = 0; } else { @@ -3093,6 +3291,18 @@ static int compareStringObjects(robj *a, robj *b) { return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr); } +/* Equal string objects return 1 if the two objects are the same from the + * point of view of a string comparison, otherwise 0 is returned. Note that + * this function is faster then checking for (compareStringObject(a,b) == 0) + * because it can perform some more optimization. */ +static int equalStringObjects(robj *a, robj *b) { + if (a->encoding != REDIS_ENCODING_RAW && b->encoding != REDIS_ENCODING_RAW){ + return a->ptr == b->ptr; + } else { + return compareStringObjects(a,b) == 0; + } +} + static size_t stringObjectLen(robj *o) { redisAssert(o->type == REDIS_STRING); if (o->encoding == REDIS_ENCODING_RAW) { @@ -3100,7 +3310,7 @@ static size_t stringObjectLen(robj *o) { } else { char buf[32]; - return snprintf(buf,32,"%ld",(long)o->ptr); + return ll2string(buf,32,(long)o->ptr); } } @@ -3118,7 +3328,7 @@ static int getDoubleFromObject(robj *o, double *target) { } else if (o->encoding == REDIS_ENCODING_INT) { value = (long)o->ptr; } else { - redisAssert(1 != 1); + redisPanic("Unknown string encoding"); } } @@ -3155,7 +3365,7 @@ static int getLongLongFromObject(robj *o, long long *target) { } else if (o->encoding == REDIS_ENCODING_INT) { value = (long)o->ptr; } else { - redisAssert(1 != 1); + redisPanic("Unknown string encoding"); } } @@ -3195,6 +3405,134 @@ static int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const return REDIS_OK; } +/* =========================== Keyspace access API ========================== */ + +static robj *lookupKey(redisDb *db, robj *key) { + dictEntry *de = dictFind(db->dict,key->ptr); + if (de) { + robj *val = dictGetEntryVal(de); + + if (server.vm_enabled) { + if (val->storage == REDIS_VM_MEMORY || + val->storage == REDIS_VM_SWAPPING) + { + /* If we were swapping the object out, cancel the operation */ + if (val->storage == REDIS_VM_SWAPPING) + vmCancelThreadedIOJob(val); + /* Update the access time for the aging algorithm. */ + val->lru = server.lruclock; + } else { + int notify = (val->storage == REDIS_VM_LOADING); + + /* Our value was swapped on disk. Bring it at home. */ + redisAssert(val->type == REDIS_VMPOINTER); + val = vmLoadObject(val); + dictGetEntryVal(de) = val; + + /* Clients blocked by the VM subsystem may be waiting for + * this key... */ + if (notify) handleClientsBlockedOnSwappedKey(db,key); + } + } + return val; + } else { + return NULL; + } +} + +static robj *lookupKeyRead(redisDb *db, robj *key) { + expireIfNeeded(db,key); + return lookupKey(db,key); +} + +static robj *lookupKeyWrite(redisDb *db, robj *key) { + deleteIfVolatile(db,key); + touchWatchedKey(db,key); + return lookupKey(db,key); +} + +static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) { + robj *o = lookupKeyRead(c->db, key); + if (!o) addReply(c,reply); + return o; +} + +static robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) { + robj *o = lookupKeyWrite(c->db, key); + if (!o) addReply(c,reply); + return o; +} + +/* Add the key to the DB. If the key already exists REDIS_ERR is returned, + * otherwise REDIS_OK is returned, and the caller should increment the + * refcount of 'val'. */ +static int dbAdd(redisDb *db, robj *key, robj *val) { + /* Perform a lookup before adding the key, as we need to copy the + * key value. */ + if (dictFind(db->dict, key->ptr) != NULL) { + return REDIS_ERR; + } else { + sds copy = sdsdup(key->ptr); + dictAdd(db->dict, copy, val); + return REDIS_OK; + } +} + +/* If the key does not exist, this is just like dbAdd(). Otherwise + * the value associated to the key is replaced with the new one. + * + * On update (key already existed) 0 is returned. Otherwise 1. */ +static int dbReplace(redisDb *db, robj *key, robj *val) { + if (dictFind(db->dict,key->ptr) == NULL) { + sds copy = sdsdup(key->ptr); + dictAdd(db->dict, copy, val); + return 1; + } else { + dictReplace(db->dict, key->ptr, val); + return 0; + } +} + +static int dbExists(redisDb *db, robj *key) { + return dictFind(db->dict,key->ptr) != NULL; +} + +/* Return a random key, in form of a Redis object. + * If there are no keys, NULL is returned. + * + * The function makes sure to return keys not already expired. */ +static robj *dbRandomKey(redisDb *db) { + struct dictEntry *de; + + while(1) { + sds key; + robj *keyobj; + + de = dictGetRandomKey(db->dict); + if (de == NULL) return NULL; + + key = dictGetEntryKey(de); + keyobj = createStringObject(key,sdslen(key)); + if (dictFind(db->expires,key)) { + if (expireIfNeeded(db,keyobj)) { + decrRefCount(keyobj); + continue; /* search for another key. This expired. */ + } + } + return keyobj; + } +} + +/* Delete a key, value, and associated expiration entry if any, from the DB */ +static int dbDelete(redisDb *db, robj *key) { + int retval; + + if (dictSize(db->expires)) dictDelete(db->expires,key->ptr); + retval = dictDelete(db->dict,key->ptr); + + return retval == DICT_OK; +} + /*============================ RDB saving/loading =========================== */ static int rdbSaveType(FILE *fp, unsigned char type) { @@ -3231,22 +3569,12 @@ static int rdbSaveLen(FILE *fp, uint32_t len) { return 0; } -/* String objects in the form "2391" "-100" without any space and with a - * range of values that can fit in an 8, 16 or 32 bit signed value can be - * encoded as integers to save space */ -static int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) { - long long value; - char *endptr, buf[32]; - - /* Check if it's possible to encode this value as a number */ - value = strtoll(s, &endptr, 10); - if (endptr[0] != '\0') return 0; - snprintf(buf,32,"%lld",value); - - /* If the number converted back into a string is not identical - * then it's not possible to encode the string as integer */ - if (strlen(buf) != len || memcmp(buf,s,len)) return 0; - +/* Encode 'value' as an integer if possible (if integer will fit the + * supported range). If the function sucessful encoded the integer + * then the (up to 5 bytes) encoded representation is written in the + * string pointed by 'enc' and the length is returned. Otherwise + * 0 is returned. */ +static int rdbEncodeInteger(long long value, unsigned char *enc) { /* Finally check if it fits in our ranges */ if (value >= -(1<<7) && value <= (1<<7)-1) { enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8; @@ -3269,6 +3597,25 @@ static int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) { } } +/* String objects in the form "2391" "-100" without any space and with a + * range of values that can fit in an 8, 16 or 32 bit signed value can be + * encoded as integers to save space */ +static int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) { + long long value; + char *endptr, buf[32]; + + /* Check if it's possible to encode this value as a number */ + value = strtoll(s, &endptr, 10); + if (endptr[0] != '\0') return 0; + ll2string(buf,32,value); + + /* If the number converted back into a string is not identical + * then it's not possible to encode the string as integer */ + if (strlen(buf) != len || memcmp(buf,s,len)) return 0; + + return rdbEncodeInteger(value,enc); +} + static int rdbSaveLzfStringObject(FILE *fp, unsigned char *s, size_t len) { size_t comprlen, outlen; unsigned char byte; @@ -3328,23 +3675,32 @@ static int rdbSaveRawString(FILE *fp, unsigned char *s, size_t len) { return 0; } +/* Save a long long value as either an encoded string or a string. */ +static int rdbSaveLongLongAsStringObject(FILE *fp, long long value) { + unsigned char buf[32]; + int enclen = rdbEncodeInteger(value,buf); + if (enclen > 0) { + if (fwrite(buf,enclen,1,fp) == 0) return -1; + } else { + /* Encode as string */ + enclen = ll2string((char*)buf,32,value); + redisAssert(enclen < 32); + if (rdbSaveLen(fp,enclen) == -1) return -1; + if (fwrite(buf,enclen,1,fp) == 0) return -1; + } + return 0; +} + /* Like rdbSaveStringObjectRaw() but handle encoded objects */ static int rdbSaveStringObject(FILE *fp, robj *obj) { - int retval; - - /* Avoid incr/decr ref count business when possible. - * This plays well with copy-on-write given that we are probably - * in a child process (BGSAVE). Also this makes sure key objects - * of swapped objects are not incRefCount-ed (an assert does not allow - * this in order to avoid bugs) */ - if (obj->encoding != REDIS_ENCODING_RAW) { - obj = getDecodedObject(obj); - retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); - decrRefCount(obj); + /* Avoid to decode the object, then encode it again, if the + * object is alrady integer encoded. */ + if (obj->encoding == REDIS_ENCODING_INT) { + return rdbSaveLongLongAsStringObject(fp,(long)obj->ptr); } else { - retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); + redisAssert(obj->encoding == REDIS_ENCODING_RAW); + return rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); } - return retval; } /* Save a double value. Doubles are saved as strings prefixed by an unsigned @@ -3366,7 +3722,23 @@ static int rdbSaveDoubleValue(FILE *fp, double val) { len = 1; buf[0] = (val < 0) ? 255 : 254; } else { - snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val); +#if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL) + /* Check if the float is in a safe range to be casted into a + * long long. We are assuming that long long is 64 bit here. + * Also we are assuming that there are no implementations around where + * double has precision < 52 bit. + * + * Under this assumptions we test if a double is inside an interval + * where casting to long long is safe. Then using two castings we + * make sure the decimal part is zero. If all this is true we use + * integer printing function that is much faster. */ + double min = -4503599627370495; /* (2^52)-1 */ + double max = 4503599627370496; /* -(2^52) */ + if (val > min && val < max && val == ((double)((long long)val))) + ll2string((char*)buf+1,sizeof(buf),(long long)val); + else +#endif + snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val); buf[0] = strlen((char*)buf+1); len = buf[0]+1; } @@ -3381,16 +3753,37 @@ static int rdbSaveObject(FILE *fp, robj *o) { if (rdbSaveStringObject(fp,o) == -1) return -1; } else if (o->type == REDIS_LIST) { /* Save a list value */ - list *list = o->ptr; - listIter li; - listNode *ln; - - if (rdbSaveLen(fp,listLength(list)) == -1) return -1; - listRewind(list,&li); - while((ln = listNext(&li))) { - robj *eleobj = listNodeValue(ln); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + if (rdbSaveLen(fp,ziplistLen(o->ptr)) == -1) return -1; + p = ziplistIndex(o->ptr,0); + while(ziplistGet(p,&vstr,&vlen,&vlong)) { + if (vstr) { + if (rdbSaveRawString(fp,vstr,vlen) == -1) + return -1; + } else { + if (rdbSaveLongLongAsStringObject(fp,vlong) == -1) + return -1; + } + p = ziplistNext(o->ptr,p); + } + } else if (o->encoding == REDIS_ENCODING_LIST) { + list *list = o->ptr; + listIter li; + listNode *ln; - if (rdbSaveStringObject(fp,eleobj) == -1) return -1; + if (rdbSaveLen(fp,listLength(list)) == -1) return -1; + listRewind(list,&li); + while((ln = listNext(&li))) { + robj *eleobj = listNodeValue(ln); + if (rdbSaveStringObject(fp,eleobj) == -1) return -1; + } + } else { + redisPanic("Unknown list encoding"); } } else if (o->type == REDIS_SET) { /* Save a set value */ @@ -3509,9 +3902,12 @@ static int rdbSave(char *filename) { /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - robj *key = dictGetEntryKey(de); - robj *o = dictGetEntryVal(de); - time_t expiretime = getExpire(db,key); + sds keystr = dictGetEntryKey(de); + robj key, *o = dictGetEntryVal(de); + time_t expiretime; + + initStaticStringObject(key,keystr); + expiretime = getExpire(db,&key); /* Save the expire time */ if (expiretime != -1) { @@ -3522,20 +3918,20 @@ static int rdbSave(char *filename) { } /* Save the key and associated value. This requires special * handling if the value is swapped out. */ - if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) { + if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || + o->storage == REDIS_VM_SWAPPING) { /* Save type, key, value */ if (rdbSaveType(fp,o->type) == -1) goto werr; - if (rdbSaveStringObject(fp,key) == -1) goto werr; + if (rdbSaveStringObject(fp,&key) == -1) goto werr; if (rdbSaveObject(fp,o) == -1) goto werr; } else { /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */ robj *po; /* Get a preview of the object in memory */ - po = vmPreviewObject(key); + po = vmPreviewObject(o); /* Save type, key, value */ - if (rdbSaveType(fp,key->vtype) == -1) goto werr; - if (rdbSaveStringObject(fp,key) == -1) goto werr; + if (rdbSaveType(fp,po->type) == -1) goto werr; + if (rdbSaveStringObject(fp,&key) == -1) goto werr; if (rdbSaveObject(fp,po) == -1) goto werr; /* Remove the loaded object from memory */ decrRefCount(po); @@ -3650,7 +4046,11 @@ static uint32_t rdbLoadLen(FILE *fp, int *isencoded) { } } -static robj *rdbLoadIntegerObject(FILE *fp, int enctype) { +/* Load an integer-encoded object from file 'fp', with the specified + * encoding type 'enctype'. If encode is true the function may return + * an integer-encoded object as reply, otherwise the returned object + * will always be encoded as a raw string. */ +static robj *rdbLoadIntegerObject(FILE *fp, int enctype, int encode) { unsigned char enc[4]; long long val; @@ -3671,7 +4071,10 @@ static robj *rdbLoadIntegerObject(FILE *fp, int enctype) { val = 0; /* anti-warning */ redisPanic("Unknown RDB integer encoding type"); } - return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val)); + if (encode) + return createStringObjectFromLongLong(val); + else + return createObject(REDIS_STRING,sdsfromlonglong(val)); } static robj *rdbLoadLzfStringObject(FILE*fp) { @@ -3693,7 +4096,7 @@ err: return NULL; } -static robj *rdbLoadStringObject(FILE*fp) { +static robj *rdbGenericLoadStringObject(FILE*fp, int encode) { int isencoded; uint32_t len; sds val; @@ -3704,7 +4107,7 @@ static robj *rdbLoadStringObject(FILE*fp) { case REDIS_RDB_ENC_INT8: case REDIS_RDB_ENC_INT16: case REDIS_RDB_ENC_INT32: - return rdbLoadIntegerObject(fp,len); + return rdbLoadIntegerObject(fp,len,encode); case REDIS_RDB_ENC_LZF: return rdbLoadLzfStringObject(fp); default: @@ -3721,6 +4124,14 @@ static robj *rdbLoadStringObject(FILE*fp) { return createObject(REDIS_STRING,val); } +static robj *rdbLoadStringObject(FILE *fp) { + return rdbGenericLoadStringObject(fp,0); +} + +static robj *rdbLoadEncodedStringObject(FILE *fp) { + return rdbGenericLoadStringObject(fp,1); +} + /* For information about double serialization check rdbSaveDoubleValue() */ static int rdbLoadDoubleValue(FILE *fp, double *val) { char buf[128]; @@ -3742,34 +4153,60 @@ static int rdbLoadDoubleValue(FILE *fp, double *val) { /* Load a Redis object of the specified type from the specified file. * On success a newly allocated object is returned, otherwise NULL. */ static robj *rdbLoadObject(int type, FILE *fp) { - robj *o; + robj *o, *ele, *dec; + size_t len; redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp)); if (type == REDIS_STRING) { /* Read string value */ - if ((o = rdbLoadStringObject(fp)) == NULL) return NULL; + if ((o = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; o = tryObjectEncoding(o); - } else if (type == REDIS_LIST || type == REDIS_SET) { - /* Read list/set value */ - uint32_t listlen; + } else if (type == REDIS_LIST) { + /* Read list value */ + if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; + + /* Use a real list when there are too many entries */ + if (len > server.list_max_ziplist_entries) { + o = createListObject(); + } else { + o = createZiplistObject(); + } - if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; - o = (type == REDIS_LIST) ? createListObject() : createSetObject(); + /* Load every single element of the list */ + while(len--) { + if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + + /* If we are using a ziplist and the value is too big, convert + * the object to a real list. */ + if (o->encoding == REDIS_ENCODING_ZIPLIST && + ele->encoding == REDIS_ENCODING_RAW && + sdslen(ele->ptr) > server.list_max_ziplist_value) + listTypeConvert(o,REDIS_ENCODING_LIST); + + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + dec = getDecodedObject(ele); + o->ptr = ziplistPush(o->ptr,dec->ptr,sdslen(dec->ptr),REDIS_TAIL); + decrRefCount(dec); + decrRefCount(ele); + } else { + ele = tryObjectEncoding(ele); + listAddNodeTail(o->ptr,ele); + incrRefCount(ele); + } + } + } else if (type == REDIS_SET) { + /* Read list/set value */ + if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; + o = createSetObject(); /* It's faster to expand the dict to the right size asap in order * to avoid rehashing */ - if (type == REDIS_SET && listlen > DICT_HT_INITIAL_SIZE) - dictExpand(o->ptr,listlen); + if (len > DICT_HT_INITIAL_SIZE) + dictExpand(o->ptr,len); /* Load every single element of the list/set */ - while(listlen--) { - robj *ele; - - if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL; + while(len--) { + if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; ele = tryObjectEncoding(ele); - if (type == REDIS_LIST) { - listAddNodeTail((list*)o->ptr,ele); - } else { - dictAdd((dict*)o->ptr,ele,NULL); - } + dictAdd((dict*)o->ptr,ele,NULL); } } else if (type == REDIS_ZSET) { /* Read list/set value */ @@ -3784,7 +4221,7 @@ static robj *rdbLoadObject(int type, FILE *fp) { robj *ele; double *score = zmalloc(sizeof(double)); - if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL; + if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; ele = tryObjectEncoding(ele); if (rdbLoadDoubleValue(fp,score) == -1) return NULL; dictAdd(zs->dict,ele,score); @@ -3837,14 +4274,12 @@ static robj *rdbLoadObject(int type, FILE *fp) { static int rdbLoad(char *filename) { FILE *fp; - robj *keyobj = NULL; uint32_t dbid; int type, retval, rdbver; - dict *d = server.db[0].dict; + int swap_all_values = 0; redisDb *db = server.db+0; char buf[1024]; - time_t expiretime = -1, now = time(NULL); - long long loadedkeys = 0; + time_t expiretime, now = time(NULL); fp = fopen(filename,"r"); if (!fp) return REDIS_ERR; @@ -3862,8 +4297,10 @@ static int rdbLoad(char *filename) { return REDIS_ERR; } while(1) { - robj *o; + robj *key, *val; + int force_swapout; + expiretime = -1; /* Read type. */ if ((type = rdbLoadType(fp)) == -1) goto eoferr; if (type == REDIS_EXPIRETIME) { @@ -3881,45 +4318,110 @@ static int rdbLoad(char *filename) { exit(1); } db = server.db+dbid; - d = db->dict; continue; } /* Read key */ - if ((keyobj = rdbLoadStringObject(fp)) == NULL) goto eoferr; + if ((key = rdbLoadStringObject(fp)) == NULL) goto eoferr; /* Read value */ - if ((o = rdbLoadObject(type,fp)) == NULL) goto eoferr; + if ((val = rdbLoadObject(type,fp)) == NULL) goto eoferr; + /* Check if the key already expired */ + if (expiretime != -1 && expiretime < now) { + decrRefCount(key); + decrRefCount(val); + continue; + } /* Add the new object in the hash table */ - retval = dictAdd(d,keyobj,o); - if (retval == DICT_ERR) { - redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr); + retval = dbAdd(db,key,val); + if (retval == REDIS_ERR) { + redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", key->ptr); exit(1); } /* Set the expire time if needed */ - if (expiretime != -1) { - setExpire(db,keyobj,expiretime); - /* Delete this key if already expired */ - if (expiretime < now) deleteKey(db,keyobj); - expiretime = -1; - } - keyobj = o = NULL; + if (expiretime != -1) setExpire(db,key,expiretime); + /* Handle swapping while loading big datasets when VM is on */ - loadedkeys++; - if (server.vm_enabled && (loadedkeys % 5000) == 0) { + + /* If we detecter we are hopeless about fitting something in memory + * we just swap every new key on disk. Directly... + * Note that's important to check for this condition before resorting + * to random sampling, otherwise we may try to swap already + * swapped keys. */ + if (swap_all_values) { + dictEntry *de = dictFind(db->dict,key->ptr); + + /* de may be NULL since the key already expired */ + if (de) { + vmpointer *vp; + val = dictGetEntryVal(de); + + if (val->refcount == 1 && + (vp = vmSwapObjectBlocking(val)) != NULL) + dictGetEntryVal(de) = vp; + } + decrRefCount(key); + continue; + } + decrRefCount(key); + + /* Flush data on disk once 32 MB of additional RAM are used... */ + force_swapout = 0; + if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) + force_swapout = 1; + + /* If we have still some hope of having some value fitting memory + * then we try random sampling. */ + if (!swap_all_values && server.vm_enabled && force_swapout) { while (zmalloc_used_memory() > server.vm_max_memory) { if (vmSwapOneObjectBlocking() == REDIS_ERR) break; } + if (zmalloc_used_memory() > server.vm_max_memory) + swap_all_values = 1; /* We are already using too much mem */ } } fclose(fp); return REDIS_OK; eoferr: /* unexpected end of file is handled here with a fatal exit */ - if (keyobj) decrRefCount(keyobj); redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); exit(1); return REDIS_ERR; /* Just to avoid warning */ } +/*================================== Shutdown =============================== */ +static int prepareForShutdown() { + redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); + /* Kill the saving child if there is a background saving in progress. + We want to avoid race conditions, for instance our saving child may + overwrite the synchronous saving did by SHUTDOWN. */ + if (server.bgsavechildpid != -1) { + redisLog(REDIS_WARNING,"There is a live saving child. Killing it!"); + kill(server.bgsavechildpid,SIGKILL); + rdbRemoveTempFile(server.bgsavechildpid); + } + if (server.appendonly) { + /* Append only file: fsync() the AOF and exit */ + aof_fsync(server.appendfd); + if (server.vm_enabled) unlink(server.vm_swap_file); + } else { + /* Snapshotting. Perform a SYNC SAVE and exit */ + if (rdbSave(server.dbfilename) == REDIS_OK) { + if (server.daemonize) + unlink(server.pidfile); + redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); + } else { + /* Ooops.. error saving! The best we can do is to continue + * operating. Note that if there was a background saving process, + * in the next cron() Redis will be notified that the background + * saving aborted, handling special stuff like slaves pending for + * synchronization... */ + redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit"); + return REDIS_ERR; + } + } + redisLog(REDIS_WARNING,"Server exit now, bye bye..."); + return REDIS_OK; +} + /*================================== Commands =============================== */ static void authCommand(redisClient *c) { @@ -3942,40 +4444,49 @@ static void echoCommand(redisClient *c) { /*=================================== Strings =============================== */ -static void setGenericCommand(redisClient *c, int nx) { +static void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expire) { int retval; + long seconds = 0; /* initialized to avoid an harmness warning */ + + if (expire) { + if (getLongFromObjectOrReply(c, expire, &seconds, NULL) != REDIS_OK) + return; + if (seconds <= 0) { + addReplySds(c,sdsnew("-ERR invalid expire time in SETEX\r\n")); + return; + } + } - if (nx) deleteIfVolatile(c->db,c->argv[1]); - retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); - if (retval == DICT_ERR) { + touchWatchedKey(c->db,key); + if (nx) deleteIfVolatile(c->db,key); + retval = dbAdd(c->db,key,val); + if (retval == REDIS_ERR) { if (!nx) { - /* If the key is about a swapped value, we want a new key object - * to overwrite the old. So we delete the old key in the database. - * This will also make sure that swap pages about the old object - * will be marked as free. */ - if (server.vm_enabled && deleteIfSwapped(c->db,c->argv[1])) - incrRefCount(c->argv[1]); - dictReplace(c->db->dict,c->argv[1],c->argv[2]); - incrRefCount(c->argv[2]); + dbReplace(c->db,key,val); + incrRefCount(val); } else { addReply(c,shared.czero); return; } } else { - incrRefCount(c->argv[1]); - incrRefCount(c->argv[2]); + incrRefCount(val); } server.dirty++; - removeExpire(c->db,c->argv[1]); + removeExpire(c->db,key); + if (expire) setExpire(c->db,key,time(NULL)+seconds); addReply(c, nx ? shared.cone : shared.ok); } static void setCommand(redisClient *c) { - setGenericCommand(c,0); + setGenericCommand(c,0,c->argv[1],c->argv[2],NULL); } static void setnxCommand(redisClient *c) { - setGenericCommand(c,1); + setGenericCommand(c,1,c->argv[1],c->argv[2],NULL); +} + +static void setexCommand(redisClient *c) { + setGenericCommand(c,0,c->argv[1],c->argv[3],c->argv[2]); } static int getGenericCommand(redisClient *c) { @@ -3999,11 +4510,7 @@ static void getCommand(redisClient *c) { static void getsetCommand(redisClient *c) { if (getGenericCommand(c) == REDIS_ERR) return; - if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) { - dictReplace(c->db->dict,c->argv[1],c->argv[2]); - } else { - incrRefCount(c->argv[1]); - } + dbReplace(c->db,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); server.dirty++; removeExpire(c->db,c->argv[1]); @@ -4049,17 +4556,9 @@ static void msetGenericCommand(redisClient *c, int nx) { } for (j = 1; j < c->argc; j += 2) { - int retval; - c->argv[j+1] = tryObjectEncoding(c->argv[j+1]); - retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]); - if (retval == DICT_ERR) { - dictReplace(c->db->dict,c->argv[j],c->argv[j+1]); - incrRefCount(c->argv[j+1]); - } else { - incrRefCount(c->argv[j]); - incrRefCount(c->argv[j+1]); - } + dbReplace(c->db,c->argv[j],c->argv[j+1]); + incrRefCount(c->argv[j+1]); removeExpire(c->db,c->argv[j]); } server.dirty += (c->argc-1)/2; @@ -4076,23 +4575,15 @@ static void msetnxCommand(redisClient *c) { static void incrDecrCommand(redisClient *c, long long incr) { long long value; - int retval; robj *o; o = lookupKeyWrite(c->db,c->argv[1]); - - if (getLongLongFromObjectOrReply(c, o, &value, NULL) != REDIS_OK) return; + if (o != NULL && checkType(c,o,REDIS_STRING)) return; + if (getLongLongFromObjectOrReply(c,o,&value,NULL) != REDIS_OK) return; value += incr; - o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value)); - o = tryObjectEncoding(o); - retval = dictAdd(c->db->dict,c->argv[1],o); - if (retval == DICT_ERR) { - dictReplace(c->db->dict,c->argv[1],o); - removeExpire(c->db,c->argv[1]); - } else { - incrRefCount(c->argv[1]); - } + o = createStringObjectFromLongLong(value); + dbReplace(c->db,c->argv[1],o); server.dirty++; addReply(c,shared.colon); addReply(c,o); @@ -4129,17 +4620,10 @@ static void appendCommand(redisClient *c) { o = lookupKeyWrite(c->db,c->argv[1]); if (o == NULL) { /* Create the key */ - retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); - incrRefCount(c->argv[1]); + retval = dbAdd(c->db,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); totlen = stringObjectLen(c->argv[2]); } else { - dictEntry *de; - - de = dictFind(c->db->dict,c->argv[1]); - assert(de != NULL); - - o = dictGetEntryVal(de); if (o->type != REDIS_STRING) { addReply(c,shared.wrongtypeerr); return; @@ -4151,7 +4635,7 @@ static void appendCommand(redisClient *c) { o = createStringObject(decoded->ptr, sdslen(decoded->ptr)); decrRefCount(decoded); - dictReplace(c->db->dict,c->argv[1],o); + dbReplace(c->db,c->argv[1],o); } /* APPEND! */ if (c->argv[2]->encoding == REDIS_ENCODING_RAW) { @@ -4210,16 +4694,22 @@ static void delCommand(redisClient *c) { int deleted = 0, j; for (j = 1; j < c->argc; j++) { - if (deleteKey(c->db,c->argv[j])) { + if (dbDelete(c->db,c->argv[j])) { + touchWatchedKey(c->db,c->argv[j]); server.dirty++; deleted++; } } - addReplyLong(c,deleted); + addReplyLongLong(c,deleted); } static void existsCommand(redisClient *c) { - addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero); + expireIfNeeded(c->db,c->argv[1]); + if (dbExists(c->db,c->argv[1])) { + addReply(c, shared.cone); + } else { + addReply(c, shared.czero); + } } static void selectCommand(redisClient *c) { @@ -4233,20 +4723,15 @@ static void selectCommand(redisClient *c) { } static void randomkeyCommand(redisClient *c) { - dictEntry *de; + robj *key; - while(1) { - de = dictGetRandomKey(c->db->dict); - if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break; - } - if (de == NULL) { - addReply(c,shared.plus); - addReply(c,shared.crlf); - } else { - addReply(c,shared.plus); - addReply(c,dictGetEntryKey(de)); - addReply(c,shared.crlf); + if ((key = dbRandomKey(c->db)) == NULL) { + addReply(c,shared.nullbulk); + return; } + + addReplyBulk(c,key); + decrRefCount(key); } static void keysCommand(redisClient *c) { @@ -4261,15 +4746,17 @@ static void keysCommand(redisClient *c) { addReply(c,lenobj); decrRefCount(lenobj); while((de = dictNext(di)) != NULL) { - robj *keyobj = dictGetEntryKey(de); + sds key = dictGetEntryKey(de); + robj *keyobj; - sds key = keyobj->ptr; if ((pattern[0] == '*' && pattern[1] == '\0') || stringmatchlen(pattern,plen,key,sdslen(key),0)) { + keyobj = createStringObject(key,sdslen(key)); if (expireIfNeeded(c->db,keyobj) == 0) { addReplyBulk(c,keyobj); numkeys++; } + decrRefCount(keyobj); } } dictReleaseIterator(di); @@ -4333,40 +4820,9 @@ static void bgsaveCommand(redisClient *c) { } static void shutdownCommand(redisClient *c) { - redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); - /* Kill the saving child if there is a background saving in progress. - We want to avoid race conditions, for instance our saving child may - overwrite the synchronous saving did by SHUTDOWN. */ - if (server.bgsavechildpid != -1) { - redisLog(REDIS_WARNING,"There is a live saving child. Killing it!"); - kill(server.bgsavechildpid,SIGKILL); - rdbRemoveTempFile(server.bgsavechildpid); - } - if (server.appendonly) { - /* Append only file: fsync() the AOF and exit */ - fsync(server.appendfd); - if (server.vm_enabled) unlink(server.vm_swap_file); + if (prepareForShutdown() == REDIS_OK) exit(0); - } else { - /* Snapshotting. Perform a SYNC SAVE and exit */ - if (rdbSave(server.dbfilename) == REDIS_OK) { - if (server.daemonize) - unlink(server.pidfile); - redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); - redisLog(REDIS_WARNING,"Server exit now, bye bye..."); - if (server.vm_enabled) unlink(server.vm_swap_file); - exit(0); - } else { - /* Ooops.. error saving! The best we can do is to continue - * operating. Note that if there was a background saving process, - * in the next cron() Redis will be notified that the background - * saving aborted, handling special stuff like slaves pending for - * synchronization... */ - redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit"); - addReplySds(c, - sdsnew("-ERR can't quit, problems saving the DB\r\n")); - } - } + addReplySds(c, sdsnew("-ERR Errors trying to SHUTDOWN. Check logs.\r\n")); } static void renameGenericCommand(redisClient *c, int nx) { @@ -4383,17 +4839,16 @@ static void renameGenericCommand(redisClient *c, int nx) { incrRefCount(o); deleteIfVolatile(c->db,c->argv[2]); - if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) { + if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) { if (nx) { decrRefCount(o); addReply(c,shared.czero); return; } - dictReplace(c->db->dict,c->argv[2],o); - } else { - incrRefCount(c->argv[2]); + dbReplace(c->db,c->argv[2],o); } - deleteKey(c->db,c->argv[1]); + dbDelete(c->db,c->argv[1]); + touchWatchedKey(c->db,c->argv[2]); server.dirty++; addReply(c,nx ? shared.cone : shared.ok); } @@ -4437,40 +4892,265 @@ static void moveCommand(redisClient *c) { /* Try to add the element to the target DB */ deleteIfVolatile(dst,c->argv[1]); - if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) { + if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) { addReply(c,shared.czero); return; } - incrRefCount(c->argv[1]); incrRefCount(o); /* OK! key moved, free the entry in the source DB */ - deleteKey(src,c->argv[1]); + dbDelete(src,c->argv[1]); server.dirty++; addReply(c,shared.cone); } /* =================================== Lists ================================ */ -static void pushGenericCommand(redisClient *c, int where) { - robj *lobj; - list *list; - lobj = lookupKeyWrite(c->db,c->argv[1]); + +/* Check the argument length to see if it requires us to convert the ziplist + * to a real list. Only check raw-encoded objects because integer encoded + * objects are never too long. */ +static void listTypeTryConversion(robj *subject, robj *value) { + if (subject->encoding != REDIS_ENCODING_ZIPLIST) return; + if (value->encoding == REDIS_ENCODING_RAW && + sdslen(value->ptr) > server.list_max_ziplist_value) + listTypeConvert(subject,REDIS_ENCODING_LIST); +} + +static void listTypePush(robj *subject, robj *value, int where) { + /* Check if we need to convert the ziplist */ + listTypeTryConversion(subject,value); + if (subject->encoding == REDIS_ENCODING_ZIPLIST && + ziplistLen(subject->ptr) > server.list_max_ziplist_entries) + listTypeConvert(subject,REDIS_ENCODING_LIST); + + if (subject->encoding == REDIS_ENCODING_ZIPLIST) { + int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL; + value = getDecodedObject(value); + subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos); + decrRefCount(value); + } else if (subject->encoding == REDIS_ENCODING_LIST) { + if (where == REDIS_HEAD) { + listAddNodeHead(subject->ptr,value); + } else { + listAddNodeTail(subject->ptr,value); + } + incrRefCount(value); + } else { + redisPanic("Unknown list encoding"); + } +} + +static robj *listTypePop(robj *subject, int where) { + robj *value = NULL; + if (subject->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + int pos = (where == REDIS_HEAD) ? 0 : -1; + p = ziplistIndex(subject->ptr,pos); + if (ziplistGet(p,&vstr,&vlen,&vlong)) { + if (vstr) { + value = createStringObject((char*)vstr,vlen); + } else { + value = createStringObjectFromLongLong(vlong); + } + /* We only need to delete an element when it exists */ + subject->ptr = ziplistDelete(subject->ptr,&p); + } + } else if (subject->encoding == REDIS_ENCODING_LIST) { + list *list = subject->ptr; + listNode *ln; + if (where == REDIS_HEAD) { + ln = listFirst(list); + } else { + ln = listLast(list); + } + if (ln != NULL) { + value = listNodeValue(ln); + incrRefCount(value); + listDelNode(list,ln); + } + } else { + redisPanic("Unknown list encoding"); + } + return value; +} + +static unsigned long listTypeLength(robj *subject) { + if (subject->encoding == REDIS_ENCODING_ZIPLIST) { + return ziplistLen(subject->ptr); + } else if (subject->encoding == REDIS_ENCODING_LIST) { + return listLength((list*)subject->ptr); + } else { + redisPanic("Unknown list encoding"); + } +} + +/* Structure to hold set iteration abstraction. */ +typedef struct { + robj *subject; + unsigned char encoding; + unsigned char direction; /* Iteration direction */ + unsigned char *zi; + listNode *ln; +} listTypeIterator; + +/* Structure for an entry while iterating over a list. */ +typedef struct { + listTypeIterator *li; + unsigned char *zi; /* Entry in ziplist */ + listNode *ln; /* Entry in linked list */ +} listTypeEntry; + +/* Initialize an iterator at the specified index. */ +static listTypeIterator *listTypeInitIterator(robj *subject, int index, unsigned char direction) { + listTypeIterator *li = zmalloc(sizeof(listTypeIterator)); + li->subject = subject; + li->encoding = subject->encoding; + li->direction = direction; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + li->zi = ziplistIndex(subject->ptr,index); + } else if (li->encoding == REDIS_ENCODING_LIST) { + li->ln = listIndex(subject->ptr,index); + } else { + redisPanic("Unknown list encoding"); + } + return li; +} + +/* Clean up the iterator. */ +static void listTypeReleaseIterator(listTypeIterator *li) { + zfree(li); +} + +/* Stores pointer to current the entry in the provided entry structure + * and advances the position of the iterator. Returns 1 when the current + * entry is in fact an entry, 0 otherwise. */ +static int listTypeNext(listTypeIterator *li, listTypeEntry *entry) { + /* Protect from converting when iterating */ + redisAssert(li->subject->encoding == li->encoding); + + entry->li = li; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + entry->zi = li->zi; + if (entry->zi != NULL) { + if (li->direction == REDIS_TAIL) + li->zi = ziplistNext(li->subject->ptr,li->zi); + else + li->zi = ziplistPrev(li->subject->ptr,li->zi); + return 1; + } + } else if (li->encoding == REDIS_ENCODING_LIST) { + entry->ln = li->ln; + if (entry->ln != NULL) { + if (li->direction == REDIS_TAIL) + li->ln = li->ln->next; + else + li->ln = li->ln->prev; + return 1; + } + } else { + redisPanic("Unknown list encoding"); + } + return 0; +} + +/* Return entry or NULL at the current position of the iterator. */ +static robj *listTypeGet(listTypeEntry *entry) { + listTypeIterator *li = entry->li; + robj *value = NULL; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *vstr; + unsigned int vlen; + long long vlong; + redisAssert(entry->zi != NULL); + if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) { + if (vstr) { + value = createStringObject((char*)vstr,vlen); + } else { + value = createStringObjectFromLongLong(vlong); + } + } + } else if (li->encoding == REDIS_ENCODING_LIST) { + redisAssert(entry->ln != NULL); + value = listNodeValue(entry->ln); + incrRefCount(value); + } else { + redisPanic("Unknown list encoding"); + } + return value; +} + +/* Compare the given object with the entry at the current position. */ +static int listTypeEqual(listTypeEntry *entry, robj *o) { + listTypeIterator *li = entry->li; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + redisAssert(o->encoding == REDIS_ENCODING_RAW); + return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr)); + } else if (li->encoding == REDIS_ENCODING_LIST) { + return equalStringObjects(o,listNodeValue(entry->ln)); + } else { + redisPanic("Unknown list encoding"); + } +} + +/* Delete the element pointed to. */ +static void listTypeDelete(listTypeEntry *entry) { + listTypeIterator *li = entry->li; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p = entry->zi; + li->subject->ptr = ziplistDelete(li->subject->ptr,&p); + + /* Update position of the iterator depending on the direction */ + if (li->direction == REDIS_TAIL) + li->zi = p; + else + li->zi = ziplistPrev(li->subject->ptr,p); + } else if (entry->li->encoding == REDIS_ENCODING_LIST) { + listNode *next; + if (li->direction == REDIS_TAIL) + next = entry->ln->next; + else + next = entry->ln->prev; + listDelNode(li->subject->ptr,entry->ln); + li->ln = next; + } else { + redisPanic("Unknown list encoding"); + } +} + +static void listTypeConvert(robj *subject, int enc) { + listTypeIterator *li; + listTypeEntry entry; + redisAssert(subject->type == REDIS_LIST); + + if (enc == REDIS_ENCODING_LIST) { + list *l = listCreate(); + + /* listTypeGet returns a robj with incremented refcount */ + li = listTypeInitIterator(subject,0,REDIS_TAIL); + while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry)); + listTypeReleaseIterator(li); + + subject->encoding = REDIS_ENCODING_LIST; + zfree(subject->ptr); + subject->ptr = l; + } else { + redisPanic("Unsupported list conversion"); + } +} + +static void pushGenericCommand(redisClient *c, int where) { + robj *lobj = lookupKeyWrite(c->db,c->argv[1]); if (lobj == NULL) { if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) { addReply(c,shared.cone); return; } - lobj = createListObject(); - list = lobj->ptr; - if (where == REDIS_HEAD) { - listAddNodeHead(list,c->argv[2]); - } else { - listAddNodeTail(list,c->argv[2]); - } - dictAdd(c->db->dict,c->argv[1],lobj); - incrRefCount(c->argv[1]); - incrRefCount(c->argv[2]); + lobj = createZiplistObject(); + dbAdd(c->db,c->argv[1],lobj); } else { if (lobj->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); @@ -4480,16 +5160,10 @@ static void pushGenericCommand(redisClient *c, int where) { addReply(c,shared.cone); return; } - list = lobj->ptr; - if (where == REDIS_HEAD) { - listAddNodeHead(list,c->argv[2]); - } else { - listAddNodeTail(list,c->argv[2]); - } - incrRefCount(c->argv[2]); } + listTypePush(lobj,c->argv[2],where); + addReplyLongLong(c,listTypeLength(lobj)); server.dirty++; - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(list))); } static void lpushCommand(redisClient *c) { @@ -4498,83 +5172,97 @@ static void lpushCommand(redisClient *c) { static void rpushCommand(redisClient *c) { pushGenericCommand(c,REDIS_TAIL); -} - -static void llenCommand(redisClient *c) { - robj *o; - list *l; - - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,o,REDIS_LIST)) return; +} - l = o->ptr; - addReplyUlong(c,listLength(l)); +static void llenCommand(redisClient *c) { + robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; + addReplyUlong(c,listTypeLength(o)); } static void lindexCommand(redisClient *c) { - robj *o; + robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; int index = atoi(c->argv[2]->ptr); - list *list; - listNode *ln; - - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; + robj *value = NULL; - ln = listIndex(list, index); - if (ln == NULL) { - addReply(c,shared.nullbulk); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + p = ziplistIndex(o->ptr,index); + if (ziplistGet(p,&vstr,&vlen,&vlong)) { + if (vstr) { + value = createStringObject((char*)vstr,vlen); + } else { + value = createStringObjectFromLongLong(vlong); + } + addReplyBulk(c,value); + decrRefCount(value); + } else { + addReply(c,shared.nullbulk); + } + } else if (o->encoding == REDIS_ENCODING_LIST) { + listNode *ln = listIndex(o->ptr,index); + if (ln != NULL) { + value = listNodeValue(ln); + addReplyBulk(c,value); + } else { + addReply(c,shared.nullbulk); + } } else { - robj *ele = listNodeValue(ln); - addReplyBulk(c,ele); + redisPanic("Unknown list encoding"); } } static void lsetCommand(redisClient *c) { - robj *o; + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; int index = atoi(c->argv[2]->ptr); - list *list; - listNode *ln; - - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; - - ln = listIndex(list, index); - if (ln == NULL) { - addReply(c,shared.outofrangeerr); + robj *value = c->argv[3]; + + listTypeTryConversion(o,value); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p, *zl = o->ptr; + p = ziplistIndex(zl,index); + if (p == NULL) { + addReply(c,shared.outofrangeerr); + } else { + o->ptr = ziplistDelete(o->ptr,&p); + value = getDecodedObject(value); + o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr)); + decrRefCount(value); + addReply(c,shared.ok); + server.dirty++; + } + } else if (o->encoding == REDIS_ENCODING_LIST) { + listNode *ln = listIndex(o->ptr,index); + if (ln == NULL) { + addReply(c,shared.outofrangeerr); + } else { + decrRefCount((robj*)listNodeValue(ln)); + listNodeValue(ln) = value; + incrRefCount(value); + addReply(c,shared.ok); + server.dirty++; + } } else { - robj *ele = listNodeValue(ln); - - decrRefCount(ele); - listNodeValue(ln) = c->argv[3]; - incrRefCount(c->argv[3]); - addReply(c,shared.ok); - server.dirty++; + redisPanic("Unknown list encoding"); } } static void popGenericCommand(redisClient *c, int where) { - robj *o; - list *list; - listNode *ln; - - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; - - if (where == REDIS_HEAD) - ln = listFirst(list); - else - ln = listLast(list); + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; - if (ln == NULL) { + robj *value = listTypePop(o,where); + if (value == NULL) { addReply(c,shared.nullbulk); } else { - robj *ele = listNodeValue(ln); - addReplyBulk(c,ele); - listDelNode(list,ln); - if (listLength(list) == 0) deleteKey(c->db,c->argv[1]); + addReplyBulk(c,value); + decrRefCount(value); + if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; } } @@ -4588,19 +5276,16 @@ static void rpopCommand(redisClient *c) { } static void lrangeCommand(redisClient *c) { - robj *o; + robj *o, *value; int start = atoi(c->argv[2]->ptr); int end = atoi(c->argv[3]->ptr); int llen; int rangelen, j; - list *list; - listNode *ln; - robj *ele; + listTypeEntry entry; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,REDIS_LIST)) return; - list = o->ptr; - llen = listLength(list); + llen = listTypeLength(o); /* convert negative indexes */ if (start < 0) start = llen+start; @@ -4618,13 +5303,15 @@ static void lrangeCommand(redisClient *c) { rangelen = (end-start)+1; /* Return the result in form of a multi-bulk reply */ - ln = listIndex(list, start); addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen)); + listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL); for (j = 0; j < rangelen; j++) { - ele = listNodeValue(ln); - addReplyBulk(c,ele); - ln = ln->next; + redisAssert(listTypeNext(li,&entry)); + value = listTypeGet(&entry); + addReplyBulk(c,value); + decrRefCount(value); } + listTypeReleaseIterator(li); } static void ltrimCommand(redisClient *c) { @@ -4638,8 +5325,7 @@ static void ltrimCommand(redisClient *c) { if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL || checkType(c,o,REDIS_LIST)) return; - list = o->ptr; - llen = listLength(list); + llen = listTypeLength(o); /* convert negative indexes */ if (start < 0) start = llen+start; @@ -4659,49 +5345,63 @@ static void ltrimCommand(redisClient *c) { } /* Remove list elements to perform the trim */ - for (j = 0; j < ltrim; j++) { - ln = listFirst(list); - listDelNode(list,ln); - } - for (j = 0; j < rtrim; j++) { - ln = listLast(list); - listDelNode(list,ln); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + o->ptr = ziplistDeleteRange(o->ptr,0,ltrim); + o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim); + } else if (o->encoding == REDIS_ENCODING_LIST) { + list = o->ptr; + for (j = 0; j < ltrim; j++) { + ln = listFirst(list); + listDelNode(list,ln); + } + for (j = 0; j < rtrim; j++) { + ln = listLast(list); + listDelNode(list,ln); + } + } else { + redisPanic("Unknown list encoding"); } - if (listLength(list) == 0) deleteKey(c->db,c->argv[1]); + if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; addReply(c,shared.ok); } static void lremCommand(redisClient *c) { - robj *o; - list *list; - listNode *ln, *next; + robj *subject, *obj = c->argv[3]; int toremove = atoi(c->argv[2]->ptr); int removed = 0; - int fromtail = 0; + listTypeEntry entry; - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; + subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero); + if (subject == NULL || checkType(c,subject,REDIS_LIST)) return; + /* Make sure obj is raw when we're dealing with a ziplist */ + if (subject->encoding == REDIS_ENCODING_ZIPLIST) + obj = getDecodedObject(obj); + + listTypeIterator *li; if (toremove < 0) { toremove = -toremove; - fromtail = 1; + li = listTypeInitIterator(subject,-1,REDIS_HEAD); + } else { + li = listTypeInitIterator(subject,0,REDIS_TAIL); } - ln = fromtail ? list->tail : list->head; - while (ln) { - robj *ele = listNodeValue(ln); - next = fromtail ? ln->prev : ln->next; - if (compareStringObjects(ele,c->argv[3]) == 0) { - listDelNode(list,ln); + while (listTypeNext(li,&entry)) { + if (listTypeEqual(&entry,obj)) { + listTypeDelete(&entry); server.dirty++; removed++; if (toremove && removed == toremove) break; } - ln = next; } - if (listLength(list) == 0) deleteKey(c->db,c->argv[1]); + listTypeReleaseIterator(li); + + /* Clean up raw encoded object */ + if (subject->encoding == REDIS_ENCODING_ZIPLIST) + decrRefCount(obj); + + if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]); addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed)); } @@ -4721,47 +5421,36 @@ static void lremCommand(redisClient *c) { * as well. This command was originally proposed by Ezra Zygmuntowicz. */ static void rpoplpushcommand(redisClient *c) { - robj *sobj; - list *srclist; - listNode *ln; - + robj *sobj, *value; if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,sobj,REDIS_LIST)) return; - srclist = sobj->ptr; - ln = listLast(srclist); - if (ln == NULL) { + if (listTypeLength(sobj) == 0) { addReply(c,shared.nullbulk); } else { robj *dobj = lookupKeyWrite(c->db,c->argv[2]); - robj *ele = listNodeValue(ln); - list *dstlist; - - if (dobj && dobj->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerr); - return; - } + if (dobj && checkType(c,dobj,REDIS_LIST)) return; + value = listTypePop(sobj,REDIS_TAIL); /* Add the element to the target list (unless it's directly * passed to some BLPOP-ing client */ - if (!handleClientsWaitingListPush(c,c->argv[2],ele)) { - if (dobj == NULL) { - /* Create the list if the key does not exist */ - dobj = createListObject(); - dictAdd(c->db->dict,c->argv[2],dobj); - incrRefCount(c->argv[2]); + if (!handleClientsWaitingListPush(c,c->argv[2],value)) { + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(c->db,c->argv[2],dobj); } - dstlist = dobj->ptr; - listAddNodeHead(dstlist,ele); - incrRefCount(ele); + listTypePush(dobj,value,REDIS_HEAD); } /* Send the element to the client as reply as well */ - addReplyBulk(c,ele); + addReplyBulk(c,value); - /* Finally remove the element from the source list */ - listDelNode(srclist,ln); - if (listLength(srclist) == 0) deleteKey(c->db,c->argv[1]); + /* listTypePop returns an object with its refcount incremented */ + decrRefCount(value); + + /* Delete the source list when it is empty */ + if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; } } @@ -4774,8 +5463,7 @@ static void saddCommand(redisClient *c) { set = lookupKeyWrite(c->db,c->argv[1]); if (set == NULL) { set = createSetObject(); - dictAdd(c->db->dict,c->argv[1],set); - incrRefCount(c->argv[1]); + dbAdd(c->db,c->argv[1],set); } else { if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); @@ -4800,7 +5488,7 @@ static void sremCommand(redisClient *c) { if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) { server.dirty++; if (htNeedsResize(set->ptr)) dictResize(set->ptr); - if (dictSize((dict*)set->ptr) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]); addReply(c,shared.cone); } else { addReply(c,shared.czero); @@ -4831,13 +5519,12 @@ static void smoveCommand(redisClient *c) { return; } if (dictSize((dict*)srcset->ptr) == 0 && srcset != dstset) - deleteKey(c->db,c->argv[1]); + dbDelete(c->db,c->argv[1]); server.dirty++; /* Add the element to the destination set */ if (!dstset) { dstset = createSetObject(); - dictAdd(c->db->dict,c->argv[2],dstset); - incrRefCount(c->argv[2]); + dbAdd(c->db,c->argv[2],dstset); } if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK) incrRefCount(c->argv[3]); @@ -4883,7 +5570,7 @@ static void spopCommand(redisClient *c) { addReplyBulk(c,ele); dictDelete(set->ptr,ele); if (htNeedsResize(set->ptr)) dictResize(set->ptr); - if (dictSize((dict*)set->ptr) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; } } @@ -4927,7 +5614,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long if (!setobj) { zfree(dv); if (dstkey) { - if (deleteKey(c->db,dstkey)) + if (dbDelete(c->db,dstkey)) server.dirty++; addReply(c,shared.czero); } else { @@ -4987,11 +5674,10 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long if (dstkey) { /* Store the resulting set into the target, if the intersection * is not an empty set. */ - deleteKey(c->db,dstkey); + dbDelete(c->db,dstkey); if (dictSize((dict*)dstset->ptr) > 0) { - dictAdd(c->db->dict,dstkey,dstset); - incrRefCount(dstkey); - addReplyLong(c,dictSize((dict*)dstset->ptr)); + dbAdd(c->db,dstkey,dstset); + addReplyLongLong(c,dictSize((dict*)dstset->ptr)); } else { decrRefCount(dstset); addReply(c,shared.czero); @@ -5090,11 +5776,10 @@ static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnu } else { /* If we have a target key where to store the resulting set * create this key with the result set inside */ - deleteKey(c->db,dstkey); + dbDelete(c->db,dstkey); if (dictSize((dict*)dstset->ptr) > 0) { - dictAdd(c->db->dict,dstkey,dstset); - incrRefCount(dstkey); - addReplyLong(c,dictSize((dict*)dstset->ptr)); + dbAdd(c->db,dstkey,dstset); + addReplyLongLong(c,dictSize((dict*)dstset->ptr)); } else { decrRefCount(dstset); addReply(c,shared.czero); @@ -5143,8 +5828,10 @@ static zskiplistNode *zslCreateNode(int level, double score, robj *obj) { zskiplistNode *zn = zmalloc(sizeof(*zn)); zn->forward = zmalloc(sizeof(zskiplistNode*) * level); - if (level > 0) + if (level > 1) zn->span = zmalloc(sizeof(unsigned int) * (level - 1)); + else + zn->span = NULL; zn->score = score; zn->obj = obj; return zn; @@ -5297,7 +5984,7 @@ static int zslDelete(zskiplist *zsl, double score, robj *obj) { /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ x = x->forward[0]; - if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) { + if (x && score == x->score && equalStringObjects(x->obj,obj)) { zslDeleteNode(zsl, x, update); zslFreeNode(x); return 1; @@ -5386,7 +6073,7 @@ static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) { * Returns 0 when the element cannot be found, rank otherwise. * Note that the rank is 1-based due to the span of zsl->header to the * first element. */ -static unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { +static unsigned long zslistTypeGetRank(zskiplist *zsl, double score, robj *o) { zskiplistNode *x; unsigned long rank = 0; int i; @@ -5402,7 +6089,7 @@ static unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { } /* x might be equal to zsl->header, so test if obj is non-NULL */ - if (x->obj && compareStringObjects(x->obj,o) == 0) { + if (x->obj && equalStringObjects(x->obj,o)) { return rank; } } @@ -5410,7 +6097,7 @@ static unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { } /* Finds an element by its rank. The rank argument needs to be 1-based. */ -zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { +zskiplistNode* zslistTypeGetElementByRank(zskiplist *zsl, unsigned long rank) { zskiplistNode *x; unsigned long traversed = 0; int i; @@ -5439,11 +6126,15 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor zset *zs; double *score; + if (isnan(scoreval)) { + addReplySds(c,sdsnew("-ERR provide score is Not A Number (nan)\r\n")); + return; + } + zsetobj = lookupKeyWrite(c->db,key); if (zsetobj == NULL) { zsetobj = createZsetObject(); - dictAdd(c->db->dict,key,zsetobj); - incrRefCount(key); + dbAdd(c->db,key,zsetobj); } else { if (zsetobj->type != REDIS_ZSET) { addReply(c,shared.wrongtypeerr); @@ -5467,6 +6158,15 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor } else { *score = scoreval; } + if (isnan(*score)) { + addReplySds(c, + sdsnew("-ERR resulting score is Not A Number (nan)\r\n")); + zfree(score); + /* Note that we don't need to check if the zset may be empty and + * should be removed here, as we can only obtain Nan as score if + * there was already an element in the sorted set. */ + return; + } } else { *score = scoreval; } @@ -5550,7 +6250,7 @@ static void zremCommand(redisClient *c) { /* Delete from the hash table */ dictDelete(zs->dict,c->argv[2]); if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; addReply(c,shared.cone); } @@ -5571,9 +6271,9 @@ static void zremrangebyscoreCommand(redisClient *c) { zs = zsetobj->ptr; deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); server.dirty += deleted; - addReplyLong(c,deleted); + addReplyLongLong(c,deleted); } static void zremrangebyrankCommand(redisClient *c) { @@ -5609,9 +6309,9 @@ static void zremrangebyrankCommand(redisClient *c) { * use 1-based rank */ deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); server.dirty += deleted; - addReplyLong(c, deleted); + addReplyLongLong(c, deleted); } typedef struct { @@ -5630,6 +6330,7 @@ static int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) { #define REDIS_AGGR_SUM 1 #define REDIS_AGGR_MIN 2 #define REDIS_AGGR_MAX 3 +#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e)) inline static void zunionInterAggregate(double *target, double val, int aggregate) { if (aggregate == REDIS_AGGR_SUM) { @@ -5645,7 +6346,7 @@ inline static void zunionInterAggregate(double *target, double val, int aggregat } static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { - int i, j, zsetnum; + int i, j, setnum; int aggregate = REDIS_AGGR_SUM; zsetopsrc *src; robj *dstobj; @@ -5653,32 +6354,35 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dictIterator *di; dictEntry *de; - /* expect zsetnum input keys to be given */ - zsetnum = atoi(c->argv[2]->ptr); - if (zsetnum < 1) { - addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNION/ZINTER\r\n")); + /* expect setnum input keys to be given */ + setnum = atoi(c->argv[2]->ptr); + if (setnum < 1) { + addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n")); return; } /* test if the expected number of keys would overflow */ - if (3+zsetnum > c->argc) { + if (3+setnum > c->argc) { addReply(c,shared.syntaxerr); return; } /* read keys to be used for input */ - src = zmalloc(sizeof(zsetopsrc) * zsetnum); - for (i = 0, j = 3; i < zsetnum; i++, j++) { - robj *zsetobj = lookupKeyWrite(c->db,c->argv[j]); - if (!zsetobj) { + src = zmalloc(sizeof(zsetopsrc) * setnum); + for (i = 0, j = 3; i < setnum; i++, j++) { + robj *obj = lookupKeyWrite(c->db,c->argv[j]); + if (!obj) { src[i].dict = NULL; } else { - if (zsetobj->type != REDIS_ZSET) { + if (obj->type == REDIS_ZSET) { + src[i].dict = ((zset*)obj->ptr)->dict; + } else if (obj->type == REDIS_SET) { + src[i].dict = (obj->ptr); + } else { zfree(src); addReply(c,shared.wrongtypeerr); return; } - src[i].dict = ((zset*)zsetobj->ptr)->dict; } /* default all weights to 1 */ @@ -5690,9 +6394,9 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { int remaining = c->argc - j; while (remaining) { - if (remaining >= (zsetnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { + if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { j++; remaining--; - for (i = 0; i < zsetnum; i++, j++, remaining--) { + for (i = 0; i < setnum; i++, j++, remaining--) { if (getDoubleFromObjectOrReply(c, c->argv[j], &src[i].weight, NULL) != REDIS_OK) return; } @@ -5720,7 +6424,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { /* sort sets from the smallest to largest, this will improve our * algorithm's performance */ - qsort(src,zsetnum,sizeof(zsetopsrc), qsortCompareZsetopsrcByCardinality); + qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality); dstobj = createZsetObject(); dstzset = dstobj->ptr; @@ -5733,12 +6437,12 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { di = dictGetIterator(src[0].dict); while((de = dictNext(di)) != NULL) { double *score = zmalloc(sizeof(double)), value; - *score = src[0].weight * (*(double*)dictGetEntryVal(de)); + *score = src[0].weight * zunionInterDictValue(de); - for (j = 1; j < zsetnum; j++) { + for (j = 1; j < setnum; j++) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { - value = src[j].weight * (*(double*)dictGetEntryVal(other)); + value = src[j].weight * zunionInterDictValue(other); zunionInterAggregate(score, value, aggregate); } else { break; @@ -5746,7 +6450,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { } /* skip entry when not present in every source dict */ - if (j != zsetnum) { + if (j != setnum) { zfree(score); } else { robj *o = dictGetEntryKey(de); @@ -5759,7 +6463,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dictReleaseIterator(di); } } else if (op == REDIS_OP_UNION) { - for (i = 0; i < zsetnum; i++) { + for (i = 0; i < setnum; i++) { if (!src[i].dict) continue; di = dictGetIterator(src[i].dict); @@ -5768,14 +6472,14 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue; double *score = zmalloc(sizeof(double)), value; - *score = src[i].weight * (*(double*)dictGetEntryVal(de)); + *score = src[i].weight * zunionInterDictValue(de); /* because the zsets are sorted by size, its only possible * for sets at larger indices to hold this entry */ - for (j = (i+1); j < zsetnum; j++) { + for (j = (i+1); j < setnum; j++) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { - value = src[j].weight * (*(double*)dictGetEntryVal(other)); + value = src[j].weight * zunionInterDictValue(other); zunionInterAggregate(score, value, aggregate); } } @@ -5793,11 +6497,10 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION); } - deleteKey(c->db,dstkey); + dbDelete(c->db,dstkey); if (dstzset->zsl->length) { - dictAdd(c->db->dict,dstkey,dstobj); - incrRefCount(dstkey); - addReplyLong(c, dstzset->zsl->length); + dbAdd(c->db,dstkey,dstobj); + addReplyLongLong(c, dstzset->zsl->length); server.dirty++; } else { decrRefCount(dstobj); @@ -5806,11 +6509,11 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { zfree(src); } -static void zunionCommand(redisClient *c) { +static void zunionstoreCommand(redisClient *c) { zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION); } -static void zinterCommand(redisClient *c) { +static void zinterstoreCommand(redisClient *c) { zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER); } @@ -5860,10 +6563,10 @@ static void zrangeGenericCommand(redisClient *c, int reverse) { /* check if starting point is trivial, before searching * the element in log(N) time */ if (reverse) { - ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start); + ln = start == 0 ? zsl->tail : zslistTypeGetElementByRank(zsl, llen-start); } else { ln = start == 0 ? - zsl->header->forward[0] : zslGetElementByRank(zsl, start+1); + zsl->header->forward[0] : zslistTypeGetElementByRank(zsl, start+1); } /* Return the result in form of a multi-bulk reply */ @@ -5993,7 +6696,7 @@ static void genericZrangebyscoreCommand(redisClient *c, int justcount) { if (limit > 0) limit--; } if (justcount) { - addReplyLong(c,(long)rangelen); + addReplyLongLong(c,(long)rangelen); } else { lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n", withscores ? (rangelen*2) : rangelen); @@ -6060,12 +6763,12 @@ static void zrankGenericCommand(redisClient *c, int reverse) { } score = dictGetEntryVal(de); - rank = zslGetRank(zsl, *score, c->argv[2]); + rank = zslistTypeGetRank(zsl, *score, c->argv[2]); if (rank) { if (reverse) { - addReplyLong(c, zsl->length - rank); + addReplyLongLong(c, zsl->length - rank); } else { - addReplyLong(c, rank-1); + addReplyLongLong(c, rank-1); } } else { addReply(c,shared.nullbulk); @@ -6272,8 +6975,7 @@ static robj *hashLookupWriteOrCreate(redisClient *c, robj *key) { robj *o = lookupKeyWrite(c->db,key); if (o == NULL) { o = createHashObject(); - dictAdd(c->db->dict,key,o); - incrRefCount(key); + dbAdd(c->db,key,o); } else { if (o->type != REDIS_HASH) { addReply(c,shared.wrongtypeerr); @@ -6337,12 +7039,11 @@ static void hincrbyCommand(redisClient *c) { if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return; if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return; if ((current = hashGet(o,c->argv[2])) != NULL) { - if (current->encoding == REDIS_ENCODING_RAW) - value = strtoll(current->ptr,NULL,10); - else if (current->encoding == REDIS_ENCODING_INT) - value = (long)current->ptr; - else - redisAssert(1 != 1); + if (getLongLongFromObjectOrReply(c,current,&value, + "hash value is not an integer") != REDIS_OK) { + decrRefCount(current); + return; + } decrRefCount(current); } else { value = 0; @@ -6398,7 +7099,7 @@ static void hdelCommand(redisClient *c) { checkType(c,o,REDIS_HASH)) return; if (hashDelete(o,c->argv[2])) { - if (hashLength(o) == 0) deleteKey(c->db,c->argv[1]); + if (hashLength(o) == 0) dbDelete(c->db,c->argv[1]); addReply(c,shared.cone); server.dirty++; } else { @@ -6491,12 +7192,14 @@ static void convertToRealHash(robj *o) { static void flushdbCommand(redisClient *c) { server.dirty += dictSize(c->db->dict); + touchWatchedKeysOnFlush(c->db->id); dictEmpty(c->db->dict); dictEmpty(c->db->expires); addReply(c,shared.ok); } static void flushallCommand(redisClient *c) { + touchWatchedKeysOnFlush(-1); server.dirty += emptyDb(); addReply(c,shared.ok); if (server.bgsavechildpid != -1) { @@ -6637,7 +7340,7 @@ static int sortCompare(const void *s1, const void *s2) { * is optimized for speed and a bit less for readability */ static void sortCommand(redisClient *c) { list *operations; - int outputlen = 0; + unsigned int outputlen = 0; int desc = 0, alpha = 0; int limit_start = 0, limit_count = -1, start, end; int j, dontsort = 0, vectorlen; @@ -6707,7 +7410,7 @@ static void sortCommand(redisClient *c) { /* Load the sorting vector with all the objects to sort */ switch(sortval->type) { - case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break; + case REDIS_LIST: vectorlen = listTypeLength(sortval); break; case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break; case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break; default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */ @@ -6716,18 +7419,15 @@ static void sortCommand(redisClient *c) { j = 0; if (sortval->type == REDIS_LIST) { - list *list = sortval->ptr; - listNode *ln; - listIter li; - - listRewind(list,&li); - while((ln = listNext(&li))) { - robj *ele = ln->value; - vector[j].obj = ele; + listTypeIterator *li = listTypeInitIterator(sortval,0,REDIS_TAIL); + listTypeEntry entry; + while(listTypeNext(li,&entry)) { + vector[j].obj = listTypeGet(&entry); vector[j].u.score = 0; vector[j].u.cmpobj = NULL; j++; } + listTypeReleaseIterator(li); } else { dict *set; dictIterator *di; @@ -6837,8 +7537,7 @@ static void sortCommand(redisClient *c) { } } } else { - robj *listObject = createListObject(); - list *listPtr = (list*) listObject->ptr; + robj *sobj = createZiplistObject(); /* STORE option specified, set the sorting result as a List object */ for (j = start; j <= end; j++) { @@ -6846,33 +7545,30 @@ static void sortCommand(redisClient *c) { listIter li; if (!getop) { - listAddNodeTail(listPtr,vector[j].obj); - incrRefCount(vector[j].obj); - } - listRewind(operations,&li); - while((ln = listNext(&li))) { - redisSortOperation *sop = ln->value; - robj *val = lookupKeyByPattern(c->db,sop->pattern, - vector[j].obj); + listTypePush(sobj,vector[j].obj,REDIS_TAIL); + } else { + listRewind(operations,&li); + while((ln = listNext(&li))) { + redisSortOperation *sop = ln->value; + robj *val = lookupKeyByPattern(c->db,sop->pattern, + vector[j].obj); - if (sop->type == REDIS_SORT_GET) { - if (!val) { - listAddNodeTail(listPtr,createStringObject("",0)); + if (sop->type == REDIS_SORT_GET) { + if (!val) val = createStringObject("",0); + + /* listTypePush does an incrRefCount, so we should take care + * care of the incremented refcount caused by either + * lookupKeyByPattern or createStringObject("",0) */ + listTypePush(sobj,val,REDIS_TAIL); + decrRefCount(val); } else { - /* We should do a incrRefCount on val because it is - * added to the list, but also a decrRefCount because - * it is returned by lookupKeyByPattern. This results - * in doing nothing at all. */ - listAddNodeTail(listPtr,val); + /* always fails */ + redisAssert(sop->type == REDIS_SORT_GET); } - } else { - redisAssert(sop->type == REDIS_SORT_GET); /* always fails */ } } } - if (dictReplace(c->db->dict,storekey,listObject)) { - incrRefCount(storekey); - } + dbReplace(c->db,storekey,sobj); /* Note: we add 1 because the DB is dirty anyway since even if the * SORT result is empty a new key is set and maybe the old content * replaced. */ @@ -6881,6 +7577,9 @@ static void sortCommand(redisClient *c) { } /* Cleanup */ + if (sortval->type == REDIS_LIST) + for (j = 0; j < vectorlen; j++) + decrRefCount(vector[j].obj); decrRefCount(sortval); listRelease(operations); for (j = 0; j < vectorlen; j++) { @@ -6923,6 +7622,8 @@ static sds genRedisInfoString(void) { bytesToHuman(hmem,zmalloc_used_memory()); info = sdscatprintf(sdsempty(), "redis_version:%s\r\n" + "redis_git_sha1:%s\r\n" + "redis_git_dirty:%d\r\n" "arch_bits:%s\r\n" "multiplexing_api:%s\r\n" "process_id:%ld\r\n" @@ -6940,13 +7641,15 @@ static sds genRedisInfoString(void) { "total_connections_received:%lld\r\n" "total_commands_processed:%lld\r\n" "expired_keys:%lld\r\n" - "hash_max_zipmap_entries:%ld\r\n" - "hash_max_zipmap_value:%ld\r\n" + "hash_max_zipmap_entries:%zu\r\n" + "hash_max_zipmap_value:%zu\r\n" "pubsub_channels:%ld\r\n" "pubsub_patterns:%u\r\n" "vm_enabled:%d\r\n" "role:%s\r\n" ,REDIS_VERSION, + REDIS_GIT_SHA1, + strtol(REDIS_GIT_DIRTY,NULL,10) > 0, (sizeof(long) == 8) ? "64" : "32", aeGetApiName(), (long) getpid(), @@ -7047,7 +7750,7 @@ static void monitorCommand(redisClient *c) { /* ================================= Expire ================================= */ static int removeExpire(redisDb *db, robj *key) { - if (dictDelete(db->expires,key) == DICT_OK) { + if (dictDelete(db->expires,key->ptr) == DICT_OK) { return 1; } else { return 0; @@ -7055,10 +7758,11 @@ static int removeExpire(redisDb *db, robj *key) { } static int setExpire(redisDb *db, robj *key, time_t when) { - if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) { + sds copy = sdsdup(key->ptr); + if (dictAdd(db->expires,copy,(void*)when) == DICT_ERR) { + sdsfree(copy); return 0; } else { - incrRefCount(key); return 1; } } @@ -7070,7 +7774,7 @@ static time_t getExpire(redisDb *db, robj *key) { /* No expire? return ASAP */ if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key)) == NULL) return -1; + (de = dictFind(db->expires,key->ptr)) == NULL) return -1; return (time_t) dictGetEntryVal(de); } @@ -7081,16 +7785,16 @@ static int expireIfNeeded(redisDb *db, robj *key) { /* No expire? return ASAP */ if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key)) == NULL) return 0; + (de = dictFind(db->expires,key->ptr)) == NULL) return 0; /* Lookup the expire */ when = (time_t) dictGetEntryVal(de); if (time(NULL) <= when) return 0; /* Delete the key */ - dictDelete(db->expires,key); + dbDelete(db,key); server.stat_expiredkeys++; - return dictDelete(db->dict,key) == DICT_OK; + return 1; } static int deleteIfVolatile(redisDb *db, robj *key) { @@ -7098,13 +7802,13 @@ static int deleteIfVolatile(redisDb *db, robj *key) { /* No expire? return ASAP */ if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key)) == NULL) return 0; + (de = dictFind(db->expires,key->ptr)) == NULL) return 0; /* Delete the key */ server.dirty++; server.stat_expiredkeys++; - dictDelete(db->expires,key); - return dictDelete(db->dict,key) == DICT_OK; + dictDelete(db->expires,key->ptr); + return dictDelete(db->dict,key->ptr) == DICT_OK; } static void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) { @@ -7115,13 +7819,13 @@ static void expireGenericCommand(redisClient *c, robj *key, robj *param, long of seconds -= offset; - de = dictFind(c->db->dict,key); + de = dictFind(c->db->dict,key->ptr); if (de == NULL) { addReply(c,shared.czero); return; } if (seconds <= 0) { - if (deleteKey(c->db,key)) server.dirty++; + if (dbDelete(c->db,key)) server.dirty++; addReply(c, shared.cone); return; } else { @@ -7197,6 +7901,10 @@ static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) { } static void multiCommand(redisClient *c) { + if (c->flags & REDIS_MULTI) { + addReplySds(c,sdsnew("-ERR MULTI calls can not be nested\r\n")); + return; + } c->flags |= REDIS_MULTI; addReply(c,shared.ok); } @@ -7213,6 +7921,20 @@ static void discardCommand(redisClient *c) { addReply(c,shared.ok); } +/* Send a MULTI command to all the slaves and AOF file. Check the execCommand + * implememntation for more information. */ +static void execCommandReplicateMulti(redisClient *c) { + struct redisCommand *cmd; + robj *multistring = createStringObject("MULTI",5); + + cmd = lookupCommand("multi"); + if (server.appendonly) + feedAppendOnlyFile(cmd,c->db->id,&multistring,1); + if (listLength(server.slaves)) + replicationFeedSlaves(server.slaves,c->db->id,&multistring,1); + decrRefCount(multistring); +} + static void execCommand(redisClient *c) { int j; robj **orig_argv; @@ -7223,6 +7945,25 @@ static void execCommand(redisClient *c) { return; } + /* Check if we need to abort the EXEC if some WATCHed key was touched. + * A failed EXEC will return a multi bulk nil object. */ + if (c->flags & REDIS_DIRTY_CAS) { + freeClientMultiState(c); + initClientMultiState(c); + c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); + unwatchAllKeys(c); + addReply(c,shared.nullmultibulk); + return; + } + + /* Replicate a MULTI request now that we are sure the block is executed. + * This way we'll deliver the MULTI/..../EXEC block as a whole and + * both the AOF and the replication link will have the same consistency + * and atomicity guarantees. */ + execCommandReplicateMulti(c); + + /* Exec all the queued commands */ + unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; orig_argc = c->argc; addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count)); @@ -7235,7 +7976,11 @@ static void execCommand(redisClient *c) { c->argc = orig_argc; freeClientMultiState(c); initClientMultiState(c); - c->flags &= (~REDIS_MULTI); + c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); + /* Make sure the EXEC command is always replicated / AOF, since we + * always send the MULTI command (we can't know beforehand if the + * next operations will contain at least a modification to the DB). */ + server.dirty++; } /* =========================== Blocking Operations ========================= */ @@ -7258,7 +8003,7 @@ static void execCommand(redisClient *c) { * empty we need to block. In order to do so we remove the notification for * new data to read in the client socket (so that we'll not serve new * requests if the blocking request is not served). Also we put the client - * in a dictionary (db->blockingkeys) mapping keys to a list of clients + * in a dictionary (db->blocking_keys) mapping keys to a list of clients * blocking for this keys. * - If a PUSH operation against a key with blocked clients waiting is * performed, we serve the first in the list: basically instead to push @@ -7276,22 +8021,22 @@ static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeou list *l; int j; - c->blockingkeys = zmalloc(sizeof(robj*)*numkeys); - c->blockingkeysnum = numkeys; + c->blocking_keys = zmalloc(sizeof(robj*)*numkeys); + c->blocking_keys_num = numkeys; c->blockingto = timeout; for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->blockingkeys[j] = keys[j]; + c->blocking_keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ - de = dictFind(c->db->blockingkeys,keys[j]); + de = dictFind(c->db->blocking_keys,keys[j]); if (de == NULL) { int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); - retval = dictAdd(c->db->blockingkeys,keys[j],l); + retval = dictAdd(c->db->blocking_keys,keys[j],l); incrRefCount(keys[j]); assert(retval == DICT_OK); } else { @@ -7310,22 +8055,22 @@ static void unblockClientWaitingData(redisClient *c) { list *l; int j; - assert(c->blockingkeys != NULL); + assert(c->blocking_keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->blockingkeysnum; j++) { + for (j = 0; j < c->blocking_keys_num; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blockingkeys,c->blockingkeys[j]); + de = dictFind(c->db->blocking_keys,c->blocking_keys[j]); assert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blockingkeys,c->blockingkeys[j]); - decrRefCount(c->blockingkeys[j]); + dictDelete(c->db->blocking_keys,c->blocking_keys[j]); + decrRefCount(c->blocking_keys[j]); } /* Cleanup the client structure */ - zfree(c->blockingkeys); - c->blockingkeys = NULL; + zfree(c->blocking_keys); + c->blocking_keys = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -7352,7 +8097,7 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { list *l; listNode *ln; - de = dictFind(c->db->blockingkeys,key); + de = dictFind(c->db->blocking_keys,key); if (de == NULL) return 0; l = dictGetEntryVal(de); ln = listFirst(l); @@ -7849,7 +8594,7 @@ static void freeMemoryIfNeeded(void) { minttl = t; } } - deleteKey(server.db+j,minkey); + dbDelete(server.db+j,minkey); } } if (!freed) return; /* nothing to free... */ @@ -7858,65 +8603,70 @@ static void freeMemoryIfNeeded(void) { /* ============================== Append Only file ========================== */ -static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { - sds buf = sdsempty(); - int j; - ssize_t nwritten; - time_t now; - robj *tmpargv[3]; +/* Called when the user switches from "appendonly yes" to "appendonly no" + * at runtime using the CONFIG command. */ +static void stopAppendOnly(void) { + flushAppendOnlyFile(); + aof_fsync(server.appendfd); + close(server.appendfd); - /* The DB this command was targetting is not the same as the last command - * we appendend. To issue a SELECT command is needed. */ - if (dictid != server.appendseldb) { - char seldb[64]; + server.appendfd = -1; + server.appendseldb = -1; + server.appendonly = 0; + /* rewrite operation in progress? kill it, wait child exit */ + if (server.bgsavechildpid != -1) { + int statloc; - snprintf(seldb,sizeof(seldb),"%d",dictid); - buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", - (unsigned long)strlen(seldb),seldb); - server.appendseldb = dictid; + if (kill(server.bgsavechildpid,SIGKILL) != -1) + wait3(&statloc,0,NULL); + /* reset the buffer accumulating changes while the child saves */ + sdsfree(server.bgrewritebuf); + server.bgrewritebuf = sdsempty(); + server.bgsavechildpid = -1; } +} - /* "Fix" the argv vector if the command is EXPIRE. We want to translate - * EXPIREs into EXPIREATs calls */ - if (cmd->proc == expireCommand) { - long when; - - tmpargv[0] = createStringObject("EXPIREAT",8); - tmpargv[1] = argv[1]; - incrRefCount(argv[1]); - when = time(NULL)+strtol(argv[2]->ptr,NULL,10); - tmpargv[2] = createObject(REDIS_STRING, - sdscatprintf(sdsempty(),"%ld",when)); - argv = tmpargv; +/* Called when the user switches from "appendonly no" to "appendonly yes" + * at runtime using the CONFIG command. */ +static int startAppendOnly(void) { + server.appendonly = 1; + server.lastfsync = time(NULL); + server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); + if (server.appendfd == -1) { + redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno)); + return REDIS_ERR; } - - /* Append the actual command */ - buf = sdscatprintf(buf,"*%d\r\n",argc); - for (j = 0; j < argc; j++) { - robj *o = argv[j]; - - o = getDecodedObject(o); - buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr)); - buf = sdscatlen(buf,o->ptr,sdslen(o->ptr)); - buf = sdscatlen(buf,"\r\n",2); - decrRefCount(o); + if (rewriteAppendOnlyFileBackground() == REDIS_ERR) { + server.appendonly = 0; + close(server.appendfd); + redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno)); + return REDIS_ERR; } + return REDIS_OK; +} - /* Free the objects from the modified argv for EXPIREAT */ - if (cmd->proc == expireCommand) { - for (j = 0; j < 3; j++) - decrRefCount(argv[j]); - } +/* Write the append only file buffer on disk. + * + * Since we are required to write the AOF before replying to the client, + * and the only way the client socket can get a write is entering when the + * the event loop, we accumulate all the AOF writes in a memory + * buffer and write it on disk using this function just before entering + * the event loop again. */ +static void flushAppendOnlyFile(void) { + time_t now; + ssize_t nwritten; + + if (sdslen(server.aofbuf) == 0) return; /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */ - nwritten = write(server.appendfd,buf,sdslen(buf)); - if (nwritten != (signed)sdslen(buf)) { + nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf)); + if (nwritten != (signed)sdslen(server.aofbuf)) { /* Ooops, we are in troubles. The best thing to do for now is - * to simply exit instead to give the illusion that everything is + * aborting instead of giving the illusion that everything is * working as expected. */ if (nwritten == -1) { redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno)); @@ -7925,24 +8675,105 @@ static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv } exit(1); } - /* If a background append only file rewriting is in progress we want to - * accumulate the differences between the child DB and the current one - * in a buffer, so that when the child process will do its work we - * can append the differences to the new append only file. */ - if (server.bgrewritechildpid != -1) - server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); + sdsfree(server.aofbuf); + server.aofbuf = sdsempty(); - sdsfree(buf); + /* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have + * childs performing heavy I/O on disk. */ + if (server.no_appendfsync_on_rewrite && + (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1)) + return; + /* Fsync if needed */ now = time(NULL); if (server.appendfsync == APPENDFSYNC_ALWAYS || (server.appendfsync == APPENDFSYNC_EVERYSEC && now-server.lastfsync > 1)) { - fsync(server.appendfd); /* Let's try to get this data on the disk */ + /* aof_fsync is defined as fdatasync() for Linux in order to avoid + * flushing metadata. */ + aof_fsync(server.appendfd); /* Let's try to get this data on the disk */ server.lastfsync = now; } } +static sds catAppendOnlyGenericCommand(sds buf, int argc, robj **argv) { + int j; + buf = sdscatprintf(buf,"*%d\r\n",argc); + for (j = 0; j < argc; j++) { + robj *o = getDecodedObject(argv[j]); + buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr)); + buf = sdscatlen(buf,o->ptr,sdslen(o->ptr)); + buf = sdscatlen(buf,"\r\n",2); + decrRefCount(o); + } + return buf; +} + +static sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *seconds) { + int argc = 3; + long when; + robj *argv[3]; + + /* Make sure we can use strtol */ + seconds = getDecodedObject(seconds); + when = time(NULL)+strtol(seconds->ptr,NULL,10); + decrRefCount(seconds); + + argv[0] = createStringObject("EXPIREAT",8); + argv[1] = key; + argv[2] = createObject(REDIS_STRING, + sdscatprintf(sdsempty(),"%ld",when)); + buf = catAppendOnlyGenericCommand(buf, argc, argv); + decrRefCount(argv[0]); + decrRefCount(argv[2]); + return buf; +} + +static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { + sds buf = sdsempty(); + robj *tmpargv[3]; + + /* The DB this command was targetting is not the same as the last command + * we appendend. To issue a SELECT command is needed. */ + if (dictid != server.appendseldb) { + char seldb[64]; + + snprintf(seldb,sizeof(seldb),"%d",dictid); + buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", + (unsigned long)strlen(seldb),seldb); + server.appendseldb = dictid; + } + + if (cmd->proc == expireCommand) { + /* Translate EXPIRE into EXPIREAT */ + buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]); + } else if (cmd->proc == setexCommand) { + /* Translate SETEX to SET and EXPIREAT */ + tmpargv[0] = createStringObject("SET",3); + tmpargv[1] = argv[1]; + tmpargv[2] = argv[3]; + buf = catAppendOnlyGenericCommand(buf,3,tmpargv); + decrRefCount(tmpargv[0]); + buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]); + } else { + buf = catAppendOnlyGenericCommand(buf,argc,argv); + } + + /* Append to the AOF buffer. This will be flushed on disk just before + * of re-entering the event loop, so before the client will get a + * positive reply about the operation performed. */ + server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); + + /* If a background append only file rewriting is in progress we want to + * accumulate the differences between the child DB and the current one + * in a buffer, so that when the child process will do its work we + * can append the differences to the new append only file. */ + if (server.bgrewritechildpid != -1) + server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); + + sdsfree(buf); +} + /* In Redis commands are always executed in the context of a client, so in * order to load the append only file we need to create a fake client. */ static struct redisClient *createFakeClient(void) { @@ -7960,12 +8791,14 @@ static struct redisClient *createFakeClient(void) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); + initClientMultiState(c); return c; } static void freeFakeClient(struct redisClient *c) { sdsfree(c->querybuf); listRelease(c->reply); + freeClientMultiState(c); zfree(c); } @@ -7976,7 +8809,7 @@ int loadAppendOnlyFile(char *filename) { struct redisClient *fakeClient; FILE *fp = fopen(filename,"r"); struct redis_stat sb; - unsigned long long loadedkeys = 0; + int appendonly = server.appendonly; if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) return REDIS_ERR; @@ -7986,6 +8819,10 @@ int loadAppendOnlyFile(char *filename) { exit(1); } + /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI + * to the same file we're about to read. */ + server.appendonly = 0; + fakeClient = createFakeClient(); while(1) { int argc, j; @@ -7994,6 +8831,7 @@ int loadAppendOnlyFile(char *filename) { char buf[128]; sds argsds; struct redisCommand *cmd; + int force_swapout; if (fgets(buf,sizeof(buf),fp) == NULL) { if (feof(fp)) @@ -8034,63 +8872,49 @@ int loadAppendOnlyFile(char *filename) { for (j = 0; j < argc; j++) decrRefCount(argv[j]); zfree(argv); /* Handle swapping while loading big datasets when VM is on */ - loadedkeys++; - if (server.vm_enabled && (loadedkeys % 5000) == 0) { + force_swapout = 0; + if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) + force_swapout = 1; + + if (server.vm_enabled && force_swapout) { while (zmalloc_used_memory() > server.vm_max_memory) { if (vmSwapOneObjectBlocking() == REDIS_ERR) break; } } } + + /* This point can only be reached when EOF is reached without errors. + * If the client is in the middle of a MULTI/EXEC, log error and quit. */ + if (fakeClient->flags & REDIS_MULTI) goto readerr; + fclose(fp); freeFakeClient(fakeClient); + server.appendonly = appendonly; return REDIS_OK; readerr: if (feof(fp)) { redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file"); - } else { - redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno)); - } - exit(1); -fmterr: - redisLog(REDIS_WARNING,"Bad file format reading the append only file"); - exit(1); -} - -/* Write an object into a file in the bulk format $\r\n\r\n */ -static int fwriteBulkObject(FILE *fp, robj *obj) { - char buf[128]; - int decrrc = 0; - - /* Avoid the incr/decr ref count business if possible to help - * copy-on-write (we are often in a child process when this function - * is called). - * Also makes sure that key objects don't get incrRefCount-ed when VM - * is enabled */ - if (obj->encoding != REDIS_ENCODING_RAW) { - obj = getDecodedObject(obj); - decrrc = 1; - } - snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr)); - if (fwrite(buf,strlen(buf),1,fp) == 0) goto err; - if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) - goto err; - if (fwrite("\r\n",2,1,fp) == 0) goto err; - if (decrrc) decrRefCount(obj); - return 1; -err: - if (decrrc) decrRefCount(obj); - return 0; + } else { + redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno)); + } + exit(1); +fmterr: + redisLog(REDIS_WARNING,"Bad file format reading the append only file"); + exit(1); } /* Write binary-safe string into a file in the bulkformat * $\r\n\r\n */ static int fwriteBulkString(FILE *fp, char *s, unsigned long len) { - char buf[128]; - - snprintf(buf,sizeof(buf),"$%ld\r\n",(unsigned long)len); - if (fwrite(buf,strlen(buf),1,fp) == 0) return 0; - if (len && fwrite(s,len,1,fp) == 0) return 0; + char cbuf[128]; + int clen; + cbuf[0] = '$'; + clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len); + cbuf[clen++] = '\r'; + cbuf[clen++] = '\n'; + if (fwrite(cbuf,clen,1,fp) == 0) return 0; + if (len > 0 && fwrite(s,len,1,fp) == 0) return 0; if (fwrite("\r\n",2,1,fp) == 0) return 0; return 1; } @@ -8107,16 +8931,28 @@ static int fwriteBulkDouble(FILE *fp, double d) { } /* Write a long value in bulk format $\r\n\r\n */ -static int fwriteBulkLong(FILE *fp, long l) { - char buf[128], lbuf[128]; - - snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l); - snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2); - if (fwrite(buf,strlen(buf),1,fp) == 0) return 0; - if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0; +static int fwriteBulkLongLong(FILE *fp, long long l) { + char bbuf[128], lbuf[128]; + unsigned int blen, llen; + llen = ll2string(lbuf,32,l); + blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf); + if (fwrite(bbuf,blen,1,fp) == 0) return 0; return 1; } +/* Delegate writing an object to writing a bulk string or bulk long long. */ +static int fwriteBulkObject(FILE *fp, robj *obj) { + /* Avoid using getDecodedObject to help copy-on-write (we are often + * in a child process when this function is called). */ + if (obj->encoding == REDIS_ENCODING_INT) { + return fwriteBulkLongLong(fp,(long)obj->ptr); + } else if (obj->encoding == REDIS_ENCODING_RAW) { + return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr)); + } else { + redisPanic("Unknown string encoding"); + } +} + /* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */ static int rewriteAppendOnlyFile(char *filename) { @@ -8148,28 +8984,30 @@ static int rewriteAppendOnlyFile(char *filename) { /* SELECT the new DB */ if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkLong(fp,j) == 0) goto werr; + if (fwriteBulkLongLong(fp,j) == 0) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - robj *key, *o; + sds keystr = dictGetEntryKey(de); + robj key, *o; time_t expiretime; int swapped; - key = dictGetEntryKey(de); + keystr = dictGetEntryKey(de); + o = dictGetEntryVal(de); + initStaticStringObject(key,keystr); /* If the value for this key is swapped, load a preview in memory. * We use a "swapped" flag to remember if we need to free the * value object instead to just increment the ref count anyway * in order to avoid copy-on-write of pages if we are forked() */ - if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) { - o = dictGetEntryVal(de); + if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || + o->storage == REDIS_VM_SWAPPING) { swapped = 0; } else { - o = vmPreviewObject(key); + o = vmPreviewObject(o); swapped = 1; } - expiretime = getExpire(db,key); + expiretime = getExpire(db,&key); /* Save the key and associated value */ if (o->type == REDIS_STRING) { @@ -8177,22 +9015,45 @@ static int rewriteAppendOnlyFile(char *filename) { char cmd[]="*3\r\n$3\r\nSET\r\n"; if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; /* Key and value */ - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { /* Emit the RPUSHes needed to rebuild the list */ - list *list = o->ptr; - listNode *ln; - listIter li; + char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = o->ptr; + unsigned char *p = ziplistIndex(zl,0); + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + while(ziplistGet(p,&vstr,&vlen,&vlong)) { + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (vstr) { + if (fwriteBulkString(fp,(char*)vstr,vlen) == 0) + goto werr; + } else { + if (fwriteBulkLongLong(fp,vlong) == 0) + goto werr; + } + p = ziplistNext(zl,p); + } + } else if (o->encoding == REDIS_ENCODING_LIST) { + list *list = o->ptr; + listNode *ln; + listIter li; - listRewind(list,&li); - while((ln = listNext(&li))) { - char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; - robj *eleobj = listNodeValue(ln); + listRewind(list,&li); + while((ln = listNext(&li))) { + robj *eleobj = listNodeValue(ln); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; - if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + } + } else { + redisPanic("Unknown list encoding"); } } else if (o->type == REDIS_SET) { /* Emit the SADDs needed to rebuild the set */ @@ -8205,7 +9066,7 @@ static int rewriteAppendOnlyFile(char *filename) { robj *eleobj = dictGetEntryKey(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,eleobj) == 0) goto werr; } dictReleaseIterator(di); @@ -8221,7 +9082,7 @@ static int rewriteAppendOnlyFile(char *filename) { double *score = dictGetEntryVal(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkDouble(fp,*score) == 0) goto werr; if (fwriteBulkObject(fp,eleobj) == 0) goto werr; } @@ -8237,7 +9098,7 @@ static int rewriteAppendOnlyFile(char *filename) { while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkString(fp,(char*)field,flen) == -1) return -1; if (fwriteBulkString(fp,(char*)val,vlen) == -1) @@ -8252,7 +9113,7 @@ static int rewriteAppendOnlyFile(char *filename) { robj *val = dictGetEntryVal(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,field) == -1) return -1; if (fwriteBulkObject(fp,val) == -1) return -1; } @@ -8267,8 +9128,8 @@ static int rewriteAppendOnlyFile(char *filename) { /* If this key is already expired skip it */ if (expiretime < now) continue; if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; - if (fwriteBulkLong(fp,expiretime) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr; } if (swapped) decrRefCount(o); } @@ -8277,7 +9138,7 @@ static int rewriteAppendOnlyFile(char *filename) { /* Make sure data will not remain on the OS's output buffers */ fflush(fp); - fsync(fileno(fp)); + aof_fsync(fileno(fp)); fclose(fp); /* Use RENAME to make sure the DB file is changed atomically only @@ -8392,42 +9253,49 @@ static void aofRemoveTempFile(pid_t childpid) { /* =================== Virtual Memory - Blocking Side ====================== */ -/* substitute the first occurrence of '%p' with the process pid in the - * swap file name. */ -static void expandVmSwapFilename(void) { - char *p = strstr(server.vm_swap_file,"%p"); - sds new; +/* Create a VM pointer object. This kind of objects are used in place of + * values in the key -> value hash table, for swapped out objects. */ +static vmpointer *createVmPointer(int vtype) { + vmpointer *vp = zmalloc(sizeof(vmpointer)); - if (!p) return; - new = sdsempty(); - *p = '\0'; - new = sdscat(new,server.vm_swap_file); - new = sdscatprintf(new,"%ld",(long) getpid()); - new = sdscat(new,p+2); - zfree(server.vm_swap_file); - server.vm_swap_file = new; + vp->type = REDIS_VMPOINTER; + vp->storage = REDIS_VM_SWAPPED; + vp->vtype = vtype; + return vp; } static void vmInit(void) { off_t totsize; int pipefds[2]; size_t stacksize; + struct flock fl; if (server.vm_max_threads != 0) zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ - expandVmSwapFilename(); redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file); + /* Try to open the old swap file, otherwise create it */ if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) { server.vm_fp = fopen(server.vm_swap_file,"w+b"); } if (server.vm_fp == NULL) { redisLog(REDIS_WARNING, - "Impossible to open the swap file: %s. Exiting.", + "Can't open the swap file: %s. Exiting.", strerror(errno)); exit(1); } server.vm_fd = fileno(server.vm_fp); + /* Lock the swap file for writing, this is useful in order to avoid + * another instance to use the same swap file for a config error. */ + fl.l_type = F_WRLCK; + fl.l_whence = SEEK_SET; + fl.l_start = fl.l_len = 0; + if (fcntl(server.vm_fd,F_SETLK,&fl) == -1) { + redisLog(REDIS_WARNING, + "Can't lock the swap file at '%s': %s. Make sure it is not used by another Redis instance.", server.vm_swap_file, strerror(errno)); + exit(1); + } + /* Initialize */ server.vm_next_page = 0; server.vm_near_pages = 0; server.vm_stats_used_pages = 0; @@ -8610,30 +9478,33 @@ static int vmWriteObjectOnSwap(robj *o, off_t page) { return REDIS_OK; } -/* Swap the 'val' object relative to 'key' into disk. Store all the information - * needed to later retrieve the object into the key object. +/* Transfers the 'val' object to disk. Store all the information + * a 'vmpointer' object containing all the information needed to load the + * object back later is returned. + * * If we can't find enough contiguous empty pages to swap the object on disk - * REDIS_ERR is returned. */ -static int vmSwapObjectBlocking(robj *key, robj *val) { + * NULL is returned. */ +static vmpointer *vmSwapObjectBlocking(robj *val) { off_t pages = rdbSavedObjectPages(val,NULL); off_t page; + vmpointer *vp; - assert(key->storage == REDIS_VM_MEMORY); - assert(key->refcount == 1); - if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR; - if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return REDIS_ERR; - key->vm.page = page; - key->vm.usedpages = pages; - key->storage = REDIS_VM_SWAPPED; - key->vtype = val->type; + assert(val->storage == REDIS_VM_MEMORY); + assert(val->refcount == 1); + if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return NULL; + if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return NULL; + + vp = createVmPointer(val->type); + vp->page = page; + vp->usedpages = pages; decrRefCount(val); /* Deallocate the object from memory. */ vmMarkPagesUsed(page,pages); - redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)", - (unsigned char*) key->ptr, + redisLog(REDIS_DEBUG,"VM: object %p swapped out at %lld (%lld pages)", + (void*) val, (unsigned long long) page, (unsigned long long) pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; - return REDIS_OK; + return vp; } static robj *vmReadObjectFromSwap(off_t page, int type) { @@ -8655,46 +9526,47 @@ static robj *vmReadObjectFromSwap(off_t page, int type) { return o; } -/* Load the value object relative to the 'key' object from swap to memory. +/* Load the specified object from swap to memory. * The newly allocated object is returned. * * If preview is true the unserialized object is returned to the caller but - * no changes are made to the key object, nor the pages are marked as freed */ -static robj *vmGenericLoadObject(robj *key, int preview) { + * the pages are not marked as freed, nor the vp object is freed. */ +static robj *vmGenericLoadObject(vmpointer *vp, int preview) { robj *val; - redisAssert(key->storage == REDIS_VM_SWAPPED || key->storage == REDIS_VM_LOADING); - val = vmReadObjectFromSwap(key->vm.page,key->vtype); + redisAssert(vp->type == REDIS_VMPOINTER && + (vp->storage == REDIS_VM_SWAPPED || vp->storage == REDIS_VM_LOADING)); + val = vmReadObjectFromSwap(vp->page,vp->vtype); if (!preview) { - key->storage = REDIS_VM_MEMORY; - key->vm.atime = server.unixtime; - vmMarkPagesFree(key->vm.page,key->vm.usedpages); - redisLog(REDIS_DEBUG, "VM: object %s loaded from disk", - (unsigned char*) key->ptr); + redisLog(REDIS_DEBUG, "VM: object %p loaded from disk", (void*)vp); + vmMarkPagesFree(vp->page,vp->usedpages); + zfree(vp); server.vm_stats_swapped_objects--; } else { - redisLog(REDIS_DEBUG, "VM: object %s previewed from disk", - (unsigned char*) key->ptr); + redisLog(REDIS_DEBUG, "VM: object %p previewed from disk", (void*)vp); } server.vm_stats_swapins++; return val; } -/* Plain object loading, from swap to memory */ -static robj *vmLoadObject(robj *key) { +/* Plain object loading, from swap to memory. + * + * 'o' is actually a redisVmPointer structure that will be freed by the call. + * The return value is the loaded object. */ +static robj *vmLoadObject(robj *o) { /* If we are loading the object in background, stop it, we * need to load this object synchronously ASAP. */ - if (key->storage == REDIS_VM_LOADING) - vmCancelThreadedIOJob(key); - return vmGenericLoadObject(key,0); + if (o->storage == REDIS_VM_LOADING) + vmCancelThreadedIOJob(o); + return vmGenericLoadObject((vmpointer*)o,0); } /* Just load the value on disk, without to modify the key. * This is useful when we want to perform some operation on the value * without to really bring it from swap to memory, like while saving the * dataset or rewriting the append only log. */ -static robj *vmPreviewObject(robj *key) { - return vmGenericLoadObject(key,1); +static robj *vmPreviewObject(robj *o) { + return vmGenericLoadObject((vmpointer*)o,1); } /* How a good candidate is this object for swapping? @@ -8709,14 +9581,16 @@ static robj *vmPreviewObject(robj *key) { * proportionally, this is why we use the logarithm. This algorithm is * just a first try and will probably be tuned later. */ static double computeObjectSwappability(robj *o) { - time_t age = server.unixtime - o->vm.atime; + /* actual age can be >= minage, but not < minage. As we use wrapping + * 21 bit clocks with minutes resolution for the LRU. */ + time_t minage = abs(server.lruclock - o->lru); long asize = 0; list *l; dict *d; struct dictEntry *de; int z; - if (age <= 0) return 0; + if (minage <= 0) return 0; switch(o->type) { case REDIS_STRING: if (o->encoding != REDIS_ENCODING_RAW) { @@ -8735,8 +9609,7 @@ static double computeObjectSwappability(robj *o) { long elesize; elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); asize += (sizeof(listNode)+elesize)*listLength(l); } break; @@ -8754,8 +9627,7 @@ static double computeObjectSwappability(robj *o) { de = dictGetRandomKey(d); ele = dictGetEntryKey(de); elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); asize += (sizeof(struct dictEntry)+elesize)*dictSize(d); if (z) asize += sizeof(zskiplistNode)*dictSize(d); } @@ -8782,18 +9654,16 @@ static double computeObjectSwappability(robj *o) { de = dictGetRandomKey(d); ele = dictGetEntryKey(de); elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); ele = dictGetEntryVal(de); elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); asize += (sizeof(struct dictEntry)+elesize)*dictSize(d); } } break; } - return (double)age*log(1+asize); + return (double)minage*log(1+asize); } /* Try to swap an object that's a good candidate for swapping. @@ -8807,7 +9677,8 @@ static int vmSwapOneObject(int usethreads) { struct dictEntry *best = NULL; double best_swappability = 0; redisDb *best_db = NULL; - robj *key, *val; + robj *val; + sds key; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -8823,16 +9694,14 @@ static int vmSwapOneObject(int usethreads) { if (maxtries) maxtries--; de = dictGetRandomKey(db->dict); - key = dictGetEntryKey(de); val = dictGetEntryVal(de); /* Only swap objects that are currently in memory. * - * Also don't swap shared objects if threaded VM is on, as we - * try to ensure that the main thread does not touch the + * Also don't swap shared objects: not a good idea in general and + * we need to ensure that the main thread does not touch the * object while the I/O thread is using it, but we can't * control other keys without adding additional mutex. */ - if (key->storage != REDIS_VM_MEMORY || - (server.vm_max_threads != 0 && val->refcount != 1)) { + if (val->storage != REDIS_VM_MEMORY || val->refcount != 1) { if (maxtries) i--; /* don't count this try */ continue; } @@ -8849,21 +9718,19 @@ static int vmSwapOneObject(int usethreads) { val = dictGetEntryVal(best); redisLog(REDIS_DEBUG,"Key with best swappability: %s, %f", - key->ptr, best_swappability); + key, best_swappability); - /* Unshare the key if needed */ - if (key->refcount > 1) { - robj *newkey = dupStringObject(key); - decrRefCount(key); - key = dictGetEntryKey(best) = newkey; - } /* Swap it */ if (usethreads) { - vmSwapObjectThreaded(key,val,best_db); + robj *keyobj = createStringObject(key,sdslen(key)); + vmSwapObjectThreaded(keyobj,val,best_db); + decrRefCount(keyobj); return REDIS_OK; } else { - if (vmSwapObjectBlocking(key,val) == REDIS_OK) { - dictGetEntryVal(best) = NULL; + vmpointer *vp; + + if ((vp = vmSwapObjectBlocking(val)) != NULL) { + dictGetEntryVal(best) = vp; return REDIS_OK; } else { return REDIS_ERR; @@ -8886,30 +9753,20 @@ static int vmCanSwapOut(void) { return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1); } -/* Delete a key if swapped. Returns 1 if the key was found, was swapped - * and was deleted. Otherwise 0 is returned. */ -static int deleteIfSwapped(redisDb *db, robj *key) { - dictEntry *de; - robj *foundkey; - - if ((de = dictFind(db->dict,key)) == NULL) return 0; - foundkey = dictGetEntryKey(de); - if (foundkey->storage == REDIS_VM_MEMORY) return 0; - deleteKey(db,key); - return 1; -} - /* =================== Virtual Memory - Threaded I/O ======================= */ static void freeIOJob(iojob *j) { if ((j->type == REDIS_IOJOB_PREPARE_SWAP || j->type == REDIS_IOJOB_DO_SWAP || j->type == REDIS_IOJOB_LOAD) && j->val != NULL) + { + /* we fix the storage type, otherwise decrRefCount() will try to + * kill the I/O thread Job (that does no longer exists). */ + if (j->val->storage == REDIS_VM_SWAPPING) + j->val->storage = REDIS_VM_MEMORY; decrRefCount(j->val); - /* We don't decrRefCount the j->key field as we did't incremented - * the count creating IO Jobs. This is because the key field here is - * just used as an indentifier and if a key is removed the Job should - * never be touched again. */ + } + decrRefCount(j->key); zfree(j); } @@ -8930,7 +9787,6 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, while((retval = read(fd,buf,1)) == 1) { iojob *j; listNode *ln; - robj *key; struct dictEntry *de; redisLog(REDIS_DEBUG,"Processing I/O completed job"); @@ -8953,27 +9809,26 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, } /* Post process it in the main thread, as there are things we * can do just here to avoid race conditions and/or invasive locks */ - redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount); - de = dictFind(j->db->dict,j->key); - assert(de != NULL); - key = dictGetEntryKey(de); + redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr); + de = dictFind(j->db->dict,j->key->ptr); + redisAssert(de != NULL); if (j->type == REDIS_IOJOB_LOAD) { redisDb *db; + vmpointer *vp = dictGetEntryVal(de); /* Key loaded, bring it at home */ - key->storage = REDIS_VM_MEMORY; - key->vm.atime = server.unixtime; - vmMarkPagesFree(key->vm.page,key->vm.usedpages); + vmMarkPagesFree(vp->page,vp->usedpages); redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)", - (unsigned char*) key->ptr); + (unsigned char*) j->key->ptr); server.vm_stats_swapped_objects--; server.vm_stats_swapins++; dictGetEntryVal(de) = j->val; incrRefCount(j->val); db = j->db; - freeIOJob(j); /* Handle clients waiting for this key to be loaded. */ - handleClientsBlockedOnSwappedKey(db,key); + handleClientsBlockedOnSwappedKey(db,j->key); + freeIOJob(j); + zfree(vp); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { /* Now we know the amount of pages required to swap this object. * Let's find some space for it, and queue this task again @@ -8983,8 +9838,8 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, { /* Ooops... no space or we can't swap as there is * a fork()ed Redis trying to save stuff on disk. */ + j->val->storage = REDIS_VM_MEMORY; /* undo operation */ freeIOJob(j); - key->storage = REDIS_VM_MEMORY; /* undo operation */ } else { /* Note that we need to mark this pages as used now, * if the job will be canceled, we'll mark them as freed @@ -8996,28 +9851,29 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, unlockThreadedIO(); } } else if (j->type == REDIS_IOJOB_DO_SWAP) { - robj *val; + vmpointer *vp; /* Key swapped. We can finally free some memory. */ - if (key->storage != REDIS_VM_SWAPPING) { - printf("key->storage: %d\n",key->storage); - printf("key->name: %s\n",(char*)key->ptr); - printf("key->refcount: %d\n",key->refcount); + if (j->val->storage != REDIS_VM_SWAPPING) { + vmpointer *vp = (vmpointer*) j->id; + printf("storage: %d\n",vp->storage); + printf("key->name: %s\n",(char*)j->key->ptr); printf("val: %p\n",(void*)j->val); printf("val->type: %d\n",j->val->type); printf("val->ptr: %s\n",(char*)j->val->ptr); } - redisAssert(key->storage == REDIS_VM_SWAPPING); - val = dictGetEntryVal(de); - key->vm.page = j->page; - key->vm.usedpages = j->pages; - key->storage = REDIS_VM_SWAPPED; - key->vtype = j->val->type; - decrRefCount(val); /* Deallocate the object from memory. */ - dictGetEntryVal(de) = NULL; + redisAssert(j->val->storage == REDIS_VM_SWAPPING); + vp = createVmPointer(j->val->type); + vp->page = j->page; + vp->usedpages = j->pages; + dictGetEntryVal(de) = vp; + /* Fix the storage otherwise decrRefCount will attempt to + * remove the associated I/O job */ + j->val->storage = REDIS_VM_MEMORY; + decrRefCount(j->val); redisLog(REDIS_DEBUG, "VM: object %s swapped out at %lld (%lld pages) (threaded)", - (unsigned char*) key->ptr, + (unsigned char*) j->key->ptr, (unsigned long long) j->page, (unsigned long long) j->pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; @@ -9072,7 +9928,7 @@ static void vmCancelThreadedIOJob(robj *o) { assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING); again: lockThreadedIO(); - /* Search for a matching key in one of the queues */ + /* Search for a matching object in one of the queues */ for (i = 0; i < 3; i++) { listNode *ln; listIter li; @@ -9082,9 +9938,9 @@ again: iojob *job = ln->value; if (job->canceled) continue; /* Skip this, already canceled. */ - if (job->key == o) { - redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n", - (void*)job, (char*)o->ptr, job->type, i); + if (job->id == o) { + redisLog(REDIS_DEBUG,"*** CANCELED %p (key %s) (type %d) (LIST ID %d)\n", + (void*)job, (char*)job->key->ptr, job->type, i); /* Mark the pages as free since the swap didn't happened * or happened but is now discarded. */ if (i != 1 && job->type == REDIS_IOJOB_DO_SWAP) @@ -9132,12 +9988,14 @@ again: else if (o->storage == REDIS_VM_SWAPPING) o->storage = REDIS_VM_MEMORY; unlockThreadedIO(); + redisLog(REDIS_DEBUG,"*** DONE"); return; } } } unlockThreadedIO(); - assert(1 != 1); /* We should never reach this */ + printf("Not found: %p\n", (void*)o); + redisAssert(1 != 1); /* We should never reach this */ } static void *IOThreadEntryPoint(void *arg) { @@ -9170,7 +10028,8 @@ static void *IOThreadEntryPoint(void *arg) { /* Process the Job */ if (j->type == REDIS_IOJOB_LOAD) { - j->val = vmReadObjectFromSwap(j->page,j->key->vtype); + vmpointer *vp = (vmpointer*)j->id; + j->val = vmReadObjectFromSwap(j->page,vp->vtype); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { FILE *fp = fopen("/dev/null","w+"); j->pages = rdbSavedObjectPages(j->val,fp); @@ -9266,18 +10125,16 @@ static void queueIOJob(iojob *j) { static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) { iojob *j; - assert(key->storage == REDIS_VM_MEMORY); - assert(key->refcount == 1); - j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_PREPARE_SWAP; j->db = db; j->key = key; - j->val = val; + incrRefCount(key); + j->id = j->val = val; incrRefCount(val); j->canceled = 0; j->thread = (pthread_t) -1; - key->storage = REDIS_VM_SWAPPING; + val->storage = REDIS_VM_SWAPPING; lockThreadedIO(); queueIOJob(j); @@ -9299,9 +10156,9 @@ static int waitForSwappedKey(redisClient *c, robj *key) { /* If the key does not exist or is already in RAM we don't need to * block the client at all. */ - de = dictFind(c->db->dict,key); + de = dictFind(c->db->dict,key->ptr); if (de == NULL) return 0; - o = dictGetEntryKey(de); + o = dictGetEntryVal(de); if (o->storage == REDIS_VM_MEMORY) { return 0; } else if (o->storage == REDIS_VM_SWAPPING) { @@ -9335,14 +10192,16 @@ static int waitForSwappedKey(redisClient *c, robj *key) { /* Are we already loading the key from disk? If not create a job */ if (o->storage == REDIS_VM_SWAPPED) { iojob *j; + vmpointer *vp = (vmpointer*)o; o->storage = REDIS_VM_LOADING; j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_LOAD; j->db = c->db; - j->key = o; - j->key->vtype = o->vtype; - j->page = o->vm.page; + j->id = (robj*)vp; + j->key = key; + incrRefCount(key); + j->page = vp->page; j->val = NULL; j->canceled = 0; j->thread = (pthread_t) -1; @@ -9353,12 +10212,56 @@ static int waitForSwappedKey(redisClient *c, robj *key) { return 1; } -/* Preload keys needed for the ZUNION and ZINTER commands. */ -static void zunionInterBlockClientOnSwappedKeys(redisClient *c) { +/* Preload keys for any command with first, last and step values for + * the command keys prototype, as defined in the command table. */ +static void waitForMultipleSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { + int j, last; + if (cmd->vm_firstkey == 0) return; + last = cmd->vm_lastkey; + if (last < 0) last = argc+last; + for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) { + redisAssert(j < argc); + waitForSwappedKey(c,argv[j]); + } +} + +/* Preload keys needed for the ZUNIONSTORE and ZINTERSTORE commands. + * Note that the number of keys to preload is user-defined, so we need to + * apply a sanity check against argc. */ +static void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { int i, num; - num = atoi(c->argv[2]->ptr); + REDIS_NOTUSED(cmd); + + num = atoi(argv[2]->ptr); + if (num > (argc-3)) return; for (i = 0; i < num; i++) { - waitForSwappedKey(c,c->argv[3+i]); + waitForSwappedKey(c,argv[3+i]); + } +} + +/* Preload keys needed to execute the entire MULTI/EXEC block. + * + * This function is called by blockClientOnSwappedKeys when EXEC is issued, + * and will block the client when any command requires a swapped out value. */ +static void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { + int i, margc; + struct redisCommand *mcmd; + robj **margv; + REDIS_NOTUSED(cmd); + REDIS_NOTUSED(argc); + REDIS_NOTUSED(argv); + + if (!(c->flags & REDIS_MULTI)) return; + for (i = 0; i < c->mstate.count; i++) { + mcmd = c->mstate.commands[i].cmd; + margc = c->mstate.commands[i].argc; + margv = c->mstate.commands[i].argv; + + if (mcmd->vm_preload_proc != NULL) { + mcmd->vm_preload_proc(c,mcmd,margc,margv); + } else { + waitForMultipleSwappedKeys(c,mcmd,margc,margv); + } } } @@ -9372,17 +10275,11 @@ static void zunionInterBlockClientOnSwappedKeys(redisClient *c) { * * Return 1 if the client is marked as blocked, 0 if the client can * continue as the keys it is going to access appear to be in memory. */ -static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c) { - int j, last; - +static int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) { if (cmd->vm_preload_proc != NULL) { - cmd->vm_preload_proc(c); + cmd->vm_preload_proc(c,cmd,c->argc,c->argv); } else { - if (cmd->vm_firstkey == 0) return 0; - last = cmd->vm_lastkey; - if (last < 0) last = c->argc+last; - for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) - waitForSwappedKey(c,c->argv[j]); + waitForMultipleSwappedKeys(c,cmd,c->argc,c->argv); } /* If the client was blocked for at least one key, mark it as blocked. */ @@ -9409,7 +10306,7 @@ static int dontWaitForSwappedKey(redisClient *c, robj *key) { /* Remove the key from the list of keys this client is waiting for. */ listRewind(c->io_keys,&li); while ((ln = listNext(&li)) != NULL) { - if (compareStringObjects(ln->value,key) == 0) { + if (equalStringObjects(ln->value,key)) { listDelNode(c->io_keys,ln); break; } @@ -9429,6 +10326,8 @@ static int dontWaitForSwappedKey(redisClient *c, robj *key) { return listLength(c->io_keys) == 0; } +/* Every time we now a key was loaded back in memory, we handle clients + * waiting for this key if any. */ static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) { struct dictEntry *de; list *l; @@ -9458,6 +10357,8 @@ static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) { static void configSetCommand(redisClient *c) { robj *o = getDecodedObject(c->argv[3]); + long long ll; + if (!strcasecmp(c->argv[2]->ptr,"dbfilename")) { zfree(server.dbfilename); server.dbfilename = zstrdup(o->ptr); @@ -9468,7 +10369,79 @@ static void configSetCommand(redisClient *c) { zfree(server.masterauth); server.masterauth = zstrdup(o->ptr); } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory")) { - server.maxmemory = strtoll(o->ptr, NULL, 10); + if (getLongLongFromObject(o,&ll) == REDIS_ERR || + ll < 0) goto badfmt; + server.maxmemory = ll; + } else if (!strcasecmp(c->argv[2]->ptr,"timeout")) { + if (getLongLongFromObject(o,&ll) == REDIS_ERR || + ll < 0 || ll > LONG_MAX) goto badfmt; + server.maxidletime = ll; + } else if (!strcasecmp(c->argv[2]->ptr,"appendfsync")) { + if (!strcasecmp(o->ptr,"no")) { + server.appendfsync = APPENDFSYNC_NO; + } else if (!strcasecmp(o->ptr,"everysec")) { + server.appendfsync = APPENDFSYNC_EVERYSEC; + } else if (!strcasecmp(o->ptr,"always")) { + server.appendfsync = APPENDFSYNC_ALWAYS; + } else { + goto badfmt; + } + } else if (!strcasecmp(c->argv[2]->ptr,"no-appendfsync-on-rewrite")) { + int yn = yesnotoi(o->ptr); + + if (yn == -1) goto badfmt; + server.no_appendfsync_on_rewrite = yn; + } else if (!strcasecmp(c->argv[2]->ptr,"appendonly")) { + int old = server.appendonly; + int new = yesnotoi(o->ptr); + + if (new == -1) goto badfmt; + if (old != new) { + if (new == 0) { + stopAppendOnly(); + } else { + if (startAppendOnly() == REDIS_ERR) { + addReplySds(c,sdscatprintf(sdsempty(), + "-ERR Unable to turn on AOF. Check server logs.\r\n")); + decrRefCount(o); + return; + } + } + } + } else if (!strcasecmp(c->argv[2]->ptr,"save")) { + int vlen, j; + sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen); + + /* Perform sanity check before setting the new config: + * - Even number of args + * - Seconds >= 1, changes >= 0 */ + if (vlen & 1) { + sdsfreesplitres(v,vlen); + goto badfmt; + } + for (j = 0; j < vlen; j++) { + char *eptr; + long val; + + val = strtoll(v[j], &eptr, 10); + if (eptr[0] != '\0' || + ((j & 1) == 0 && val < 1) || + ((j & 1) == 1 && val < 0)) { + sdsfreesplitres(v,vlen); + goto badfmt; + } + } + /* Finally set the new config */ + resetServerSaveParams(); + for (j = 0; j < vlen; j += 2) { + time_t seconds; + int changes; + + seconds = strtoll(v[j],NULL,10); + changes = strtoll(v[j+1],NULL,10); + appendServerSaveParams(seconds, changes); + } + sdsfreesplitres(v,vlen); } else { addReplySds(c,sdscatprintf(sdsempty(), "-ERR not supported CONFIG parameter %s\r\n", @@ -9478,6 +10451,14 @@ static void configSetCommand(redisClient *c) { } decrRefCount(o); addReply(c,shared.ok); + return; + +badfmt: /* Bad format errors */ + addReplySds(c,sdscatprintf(sdsempty(), + "-ERR invalid argument '%s' for CONFIG SET '%s'\r\n", + (char*)o->ptr, + (char*)c->argv[2]->ptr)); + decrRefCount(o); } static void configGetCommand(redisClient *c) { @@ -9507,11 +10488,58 @@ static void configGetCommand(redisClient *c) { if (stringmatch(pattern,"maxmemory",0)) { char buf[128]; - snprintf(buf,128,"%llu\n",server.maxmemory); + ll2string(buf,128,server.maxmemory); addReplyBulkCString(c,"maxmemory"); addReplyBulkCString(c,buf); matches++; } + if (stringmatch(pattern,"timeout",0)) { + char buf[128]; + + ll2string(buf,128,server.maxidletime); + addReplyBulkCString(c,"timeout"); + addReplyBulkCString(c,buf); + matches++; + } + if (stringmatch(pattern,"appendonly",0)) { + addReplyBulkCString(c,"appendonly"); + addReplyBulkCString(c,server.appendonly ? "yes" : "no"); + matches++; + } + if (stringmatch(pattern,"no-appendfsync-on-rewrite",0)) { + addReplyBulkCString(c,"no-appendfsync-on-rewrite"); + addReplyBulkCString(c,server.no_appendfsync_on_rewrite ? "yes" : "no"); + matches++; + } + if (stringmatch(pattern,"appendfsync",0)) { + char *policy; + + switch(server.appendfsync) { + case APPENDFSYNC_NO: policy = "no"; break; + case APPENDFSYNC_EVERYSEC: policy = "everysec"; break; + case APPENDFSYNC_ALWAYS: policy = "always"; break; + default: policy = "unknown"; break; /* too harmless to panic */ + } + addReplyBulkCString(c,"appendfsync"); + addReplyBulkCString(c,policy); + matches++; + } + if (stringmatch(pattern,"save",0)) { + sds buf = sdsempty(); + int j; + + for (j = 0; j < server.saveparamslen; j++) { + buf = sdscatprintf(buf,"%ld %d", + server.saveparams[j].seconds, + server.saveparams[j].changes); + if (j != server.saveparamslen-1) + buf = sdscatlen(buf," ",1); + } + addReplyBulkCString(c,"save"); + addReplyBulkCString(c,buf); + sdsfree(buf); + matches++; + } decrRefCount(o); lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2); } @@ -9555,7 +10583,7 @@ static int listMatchPubsubPattern(void *a, void *b) { pubsubPattern *pa = a, *pb = b; return (pa->client == pb->client) && - (compareStringObjects(pa->pattern,pb->pattern) == 0); + (equalStringObjects(pa->pattern,pb->pattern)); } /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or @@ -9584,7 +10612,7 @@ static int pubsubSubscribeChannel(redisClient *c, robj *channel) { addReply(c,shared.mbulk3); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); - addReplyLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); + addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); return retval; } @@ -9620,7 +10648,7 @@ static int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { addReply(c,shared.mbulk3); addReply(c,shared.unsubscribebulk); addReplyBulk(c,channel); - addReplyLong(c,dictSize(c->pubsub_channels)+ + addReplyLongLong(c,dictSize(c->pubsub_channels)+ listLength(c->pubsub_patterns)); } @@ -9646,7 +10674,7 @@ static int pubsubSubscribePattern(redisClient *c, robj *pattern) { addReply(c,shared.mbulk3); addReply(c,shared.psubscribebulk); addReplyBulk(c,pattern); - addReplyLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); + addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); return retval; } @@ -9671,7 +10699,7 @@ static int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { addReply(c,shared.mbulk3); addReply(c,shared.punsubscribebulk); addReplyBulk(c,pattern); - addReplyLong(c,dictSize(c->pubsub_channels)+ + addReplyLongLong(c,dictSize(c->pubsub_channels)+ listLength(c->pubsub_patterns)); } decrRefCount(pattern); @@ -9746,8 +10774,9 @@ static int pubsubPublishMessage(robj *channel, robj *message) { sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { - addReply(pat->client,shared.mbulk3); - addReply(pat->client,shared.messagebulk); + addReply(pat->client,shared.mbulk4); + addReply(pat->client,shared.pmessagebulk); + addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); addReplyBulk(pat->client,message); receivers++; @@ -9798,11 +10827,324 @@ static void punsubscribeCommand(redisClient *c) { static void publishCommand(redisClient *c) { int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); - addReplyLong(c,receivers); + addReplyLongLong(c,receivers); +} + +/* ===================== WATCH (CAS alike for MULTI/EXEC) =================== + * + * The implementation uses a per-DB hash table mapping keys to list of clients + * WATCHing those keys, so that given a key that is going to be modified + * we can mark all the associated clients as dirty. + * + * Also every client contains a list of WATCHed keys so that's possible to + * un-watch such keys when the client is freed or when UNWATCH is called. */ + +/* In the client->watched_keys list we need to use watchedKey structures + * as in order to identify a key in Redis we need both the key name and the + * DB */ +typedef struct watchedKey { + robj *key; + redisDb *db; +} watchedKey; + +/* Watch for the specified key */ +static void watchForKey(redisClient *c, robj *key) { + list *clients = NULL; + listIter li; + listNode *ln; + watchedKey *wk; + + /* Check if we are already watching for this key */ + listRewind(c->watched_keys,&li); + while((ln = listNext(&li))) { + wk = listNodeValue(ln); + if (wk->db == c->db && equalStringObjects(key,wk->key)) + return; /* Key already watched */ + } + /* This key is not already watched in this DB. Let's add it */ + clients = dictFetchValue(c->db->watched_keys,key); + if (!clients) { + clients = listCreate(); + dictAdd(c->db->watched_keys,key,clients); + incrRefCount(key); + } + listAddNodeTail(clients,c); + /* Add the new key to the lits of keys watched by this client */ + wk = zmalloc(sizeof(*wk)); + wk->key = key; + wk->db = c->db; + incrRefCount(key); + listAddNodeTail(c->watched_keys,wk); +} + +/* Unwatch all the keys watched by this client. To clean the EXEC dirty + * flag is up to the caller. */ +static void unwatchAllKeys(redisClient *c) { + listIter li; + listNode *ln; + + if (listLength(c->watched_keys) == 0) return; + listRewind(c->watched_keys,&li); + while((ln = listNext(&li))) { + list *clients; + watchedKey *wk; + + /* Lookup the watched key -> clients list and remove the client + * from the list */ + wk = listNodeValue(ln); + clients = dictFetchValue(wk->db->watched_keys, wk->key); + assert(clients != NULL); + listDelNode(clients,listSearchKey(clients,c)); + /* Kill the entry at all if this was the only client */ + if (listLength(clients) == 0) + dictDelete(wk->db->watched_keys, wk->key); + /* Remove this watched key from the client->watched list */ + listDelNode(c->watched_keys,ln); + decrRefCount(wk->key); + zfree(wk); + } +} + +/* "Touch" a key, so that if this key is being WATCHed by some client the + * next EXEC will fail. */ +static void touchWatchedKey(redisDb *db, robj *key) { + list *clients; + listIter li; + listNode *ln; + + if (dictSize(db->watched_keys) == 0) return; + clients = dictFetchValue(db->watched_keys, key); + if (!clients) return; + + /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ + /* Check if we are already watching for this key */ + listRewind(clients,&li); + while((ln = listNext(&li))) { + redisClient *c = listNodeValue(ln); + + c->flags |= REDIS_DIRTY_CAS; + } +} + +/* On FLUSHDB or FLUSHALL all the watched keys that are present before the + * flush but will be deleted as effect of the flushing operation should + * be touched. "dbid" is the DB that's getting the flush. -1 if it is + * a FLUSHALL operation (all the DBs flushed). */ +static void touchWatchedKeysOnFlush(int dbid) { + listIter li1, li2; + listNode *ln; + + /* For every client, check all the waited keys */ + listRewind(server.clients,&li1); + while((ln = listNext(&li1))) { + redisClient *c = listNodeValue(ln); + listRewind(c->watched_keys,&li2); + while((ln = listNext(&li2))) { + watchedKey *wk = listNodeValue(ln); + + /* For every watched key matching the specified DB, if the + * key exists, mark the client as dirty, as the key will be + * removed. */ + if (dbid == -1 || wk->db->id == dbid) { + if (dictFind(wk->db->dict, wk->key->ptr) != NULL) + c->flags |= REDIS_DIRTY_CAS; + } + } + } +} + +static void watchCommand(redisClient *c) { + int j; + + if (c->flags & REDIS_MULTI) { + addReplySds(c,sdsnew("-ERR WATCH inside MULTI is not allowed\r\n")); + return; + } + for (j = 1; j < c->argc; j++) + watchForKey(c,c->argv[j]); + addReply(c,shared.ok); +} + +static void unwatchCommand(redisClient *c) { + unwatchAllKeys(c); + c->flags &= (~REDIS_DIRTY_CAS); + addReply(c,shared.ok); } /* ================================= Debugging ============================== */ +/* Compute the sha1 of string at 's' with 'len' bytes long. + * The SHA1 is then xored againt the string pointed by digest. + * Since xor is commutative, this operation is used in order to + * "add" digests relative to unordered elements. + * + * So digest(a,b,c,d) will be the same of digest(b,a,c,d) */ +static void xorDigest(unsigned char *digest, void *ptr, size_t len) { + SHA1_CTX ctx; + unsigned char hash[20], *s = ptr; + int j; + + SHA1Init(&ctx); + SHA1Update(&ctx,s,len); + SHA1Final(hash,&ctx); + + for (j = 0; j < 20; j++) + digest[j] ^= hash[j]; +} + +static void xorObjectDigest(unsigned char *digest, robj *o) { + o = getDecodedObject(o); + xorDigest(digest,o->ptr,sdslen(o->ptr)); + decrRefCount(o); +} + +/* This function instead of just computing the SHA1 and xoring it + * against diget, also perform the digest of "digest" itself and + * replace the old value with the new one. + * + * So the final digest will be: + * + * digest = SHA1(digest xor SHA1(data)) + * + * This function is used every time we want to preserve the order so + * that digest(a,b,c,d) will be different than digest(b,c,d,a) + * + * Also note that mixdigest("foo") followed by mixdigest("bar") + * will lead to a different digest compared to "fo", "obar". + */ +static void mixDigest(unsigned char *digest, void *ptr, size_t len) { + SHA1_CTX ctx; + char *s = ptr; + + xorDigest(digest,s,len); + SHA1Init(&ctx); + SHA1Update(&ctx,digest,20); + SHA1Final(digest,&ctx); +} + +static void mixObjectDigest(unsigned char *digest, robj *o) { + o = getDecodedObject(o); + mixDigest(digest,o->ptr,sdslen(o->ptr)); + decrRefCount(o); +} + +/* Compute the dataset digest. Since keys, sets elements, hashes elements + * are not ordered, we use a trick: every aggregate digest is the xor + * of the digests of their elements. This way the order will not change + * the result. For list instead we use a feedback entering the output digest + * as input in order to ensure that a different ordered list will result in + * a different digest. */ +static void computeDatasetDigest(unsigned char *final) { + unsigned char digest[20]; + char buf[128]; + dictIterator *di = NULL; + dictEntry *de; + int j; + uint32_t aux; + + memset(final,0,20); /* Start with a clean result */ + + for (j = 0; j < server.dbnum; j++) { + redisDb *db = server.db+j; + + if (dictSize(db->dict) == 0) continue; + di = dictGetIterator(db->dict); + + /* hash the DB id, so the same dataset moved in a different + * DB will lead to a different digest */ + aux = htonl(j); + mixDigest(final,&aux,sizeof(aux)); + + /* Iterate this DB writing every entry */ + while((de = dictNext(di)) != NULL) { + sds key; + robj *keyobj, *o; + time_t expiretime; + + memset(digest,0,20); /* This key-val digest */ + key = dictGetEntryKey(de); + keyobj = createStringObject(key,sdslen(key)); + + mixDigest(digest,key,sdslen(key)); + + /* Make sure the key is loaded if VM is active */ + o = lookupKeyRead(db,keyobj); + + aux = htonl(o->type); + mixDigest(digest,&aux,sizeof(aux)); + expiretime = getExpire(db,keyobj); + + /* Save the key and associated value */ + if (o->type == REDIS_STRING) { + mixObjectDigest(digest,o); + } else if (o->type == REDIS_LIST) { + listTypeIterator *li = listTypeInitIterator(o,0,REDIS_TAIL); + listTypeEntry entry; + while(listTypeNext(li,&entry)) { + robj *eleobj = listTypeGet(&entry); + mixObjectDigest(digest,eleobj); + decrRefCount(eleobj); + } + listTypeReleaseIterator(li); + } else if (o->type == REDIS_SET) { + dict *set = o->ptr; + dictIterator *di = dictGetIterator(set); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + robj *eleobj = dictGetEntryKey(de); + + xorObjectDigest(digest,eleobj); + } + dictReleaseIterator(di); + } else if (o->type == REDIS_ZSET) { + zset *zs = o->ptr; + dictIterator *di = dictGetIterator(zs->dict); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + robj *eleobj = dictGetEntryKey(de); + double *score = dictGetEntryVal(de); + unsigned char eledigest[20]; + + snprintf(buf,sizeof(buf),"%.17g",*score); + memset(eledigest,0,20); + mixObjectDigest(eledigest,eleobj); + mixDigest(eledigest,buf,strlen(buf)); + xorDigest(digest,eledigest,20); + } + dictReleaseIterator(di); + } else if (o->type == REDIS_HASH) { + hashIterator *hi; + robj *obj; + + hi = hashInitIterator(o); + while (hashNext(hi) != REDIS_ERR) { + unsigned char eledigest[20]; + + memset(eledigest,0,20); + obj = hashCurrent(hi,REDIS_HASH_KEY); + mixObjectDigest(eledigest,obj); + decrRefCount(obj); + obj = hashCurrent(hi,REDIS_HASH_VALUE); + mixObjectDigest(eledigest,obj); + decrRefCount(obj); + xorDigest(digest,eledigest,20); + } + hashReleaseIterator(hi); + } else { + redisPanic("Unknown object type"); + } + /* If the key has an expire, add it to the mix */ + if (expiretime != -1) xorDigest(digest,"!!expire!!",10); + /* We can finally xor the key-val digest to the final digest */ + xorDigest(final,digest,20); + decrRefCount(keyobj); + } + dictReleaseIterator(di); + } +} + static void debugCommand(redisClient *c) { if (!strcasecmp(c->argv[1]->ptr,"segfault")) { *((char*)-1) = 'x'; @@ -9827,17 +11169,16 @@ static void debugCommand(redisClient *c) { redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF"); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) { - dictEntry *de = dictFind(c->db->dict,c->argv[2]); - robj *key, *val; + dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr); + robj *val; if (!de) { addReply(c,shared.nokeyerr); return; } - key = dictGetEntryKey(de); val = dictGetEntryVal(de); - if (!server.vm_enabled || (key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING)) { + if (!server.vm_enabled || (val->storage == REDIS_VM_MEMORY || + val->storage == REDIS_VM_SWAPPING)) { char *strenc; char buf[128]; @@ -9848,23 +11189,25 @@ static void debugCommand(redisClient *c) { strenc = buf; } addReplySds(c,sdscatprintf(sdsempty(), - "+Key at:%p refcount:%d, value at:%p refcount:%d " + "+Value at:%p refcount:%d " "encoding:%s serializedlength:%lld\r\n", - (void*)key, key->refcount, (void*)val, val->refcount, + (void*)val, val->refcount, strenc, (long long) rdbSavedObjectLen(val,NULL))); } else { + vmpointer *vp = (vmpointer*) val; addReplySds(c,sdscatprintf(sdsempty(), - "+Key at:%p refcount:%d, value swapped at: page %llu " + "+Value swapped at: page %llu " "using %llu pages\r\n", - (void*)key, key->refcount, (unsigned long long) key->vm.page, - (unsigned long long) key->vm.usedpages)); + (unsigned long long) vp->page, + (unsigned long long) vp->usedpages)); } } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) { lookupKeyRead(c->db,c->argv[2]); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) { - dictEntry *de = dictFind(c->db->dict,c->argv[2]); - robj *key, *val; + dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr); + robj *val; + vmpointer *vp; if (!server.vm_enabled) { addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n")); @@ -9874,23 +11217,49 @@ static void debugCommand(redisClient *c) { addReply(c,shared.nokeyerr); return; } - key = dictGetEntryKey(de); val = dictGetEntryVal(de); - /* If the key is shared we want to create a copy */ - if (key->refcount > 1) { - robj *newkey = dupStringObject(key); - decrRefCount(key); - key = dictGetEntryKey(de) = newkey; - } /* Swap it */ - if (key->storage != REDIS_VM_MEMORY) { + if (val->storage != REDIS_VM_MEMORY) { addReplySds(c,sdsnew("-ERR This key is not in memory\r\n")); - } else if (vmSwapObjectBlocking(key,val) == REDIS_OK) { - dictGetEntryVal(de) = NULL; + } else if (val->refcount != 1) { + addReplySds(c,sdsnew("-ERR Object is shared\r\n")); + } else if ((vp = vmSwapObjectBlocking(val)) != NULL) { + dictGetEntryVal(de) = vp; addReply(c,shared.ok); } else { addReply(c,shared.err); } + } else if (!strcasecmp(c->argv[1]->ptr,"populate") && c->argc == 3) { + long keys, j; + robj *key, *val; + char buf[128]; + + if (getLongFromObjectOrReply(c, c->argv[2], &keys, NULL) != REDIS_OK) + return; + for (j = 0; j < keys; j++) { + snprintf(buf,sizeof(buf),"key:%lu",j); + key = createStringObject(buf,strlen(buf)); + if (lookupKeyRead(c->db,key) != NULL) { + decrRefCount(key); + continue; + } + snprintf(buf,sizeof(buf),"value:%lu",j); + val = createStringObject(buf,strlen(buf)); + dbAdd(c->db,key,val); + decrRefCount(key); + } + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) { + unsigned char digest[20]; + sds d = sdsnew("+"); + int j; + + computeDatasetDigest(digest); + for (j = 0; j < 20; j++) + d = sdscatprintf(d, "%02x",digest[j]); + + d = sdscatlen(d,"\r\n",2); + addReplySds(c,d); } else { addReplySds(c,sdsnew( "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT |SWAPIN |SWAPOUT |RELOAD]\r\n")); @@ -9899,7 +11268,7 @@ static void debugCommand(redisClient *c) { static void _redisAssert(char *estr, char *file, int line) { redisLog(REDIS_WARNING,"=== ASSERTION FAILED ==="); - redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true\n",file,line,estr); + redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true",file,line,estr); #ifdef HAVE_BACKTRACE redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)"); *((char*)-1) = 'x'; @@ -9934,7 +11303,7 @@ int linuxOvercommitMemoryValue(void) { void linuxOvercommitMemoryWarning(void) { if (linuxOvercommitMemoryValue() == 0) { - redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."); + redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."); } } #endif /* __linux__ */ @@ -9964,7 +11333,8 @@ static void daemonize(void) { } static void version() { - printf("Redis server version %s\n", REDIS_VERSION); + printf("Redis server version %s (%s:%d)\n", REDIS_VERSION, + REDIS_GIT_SHA1, atoi(REDIS_GIT_DIRTY) > 0); exit(0); } @@ -9978,6 +11348,7 @@ int main(int argc, char **argv) { time_t start; initServerConfig(); + sortCommandTable(); if (argc == 2) { if (strcmp(argv[1], "-v") == 0 || strcmp(argv[1], "--version") == 0) version(); @@ -10078,6 +11449,13 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) { _exit(0); } +static void sigtermHandler(int sig) { + REDIS_NOTUSED(sig); + + redisLog(REDIS_WARNING,"SIGTERM received, scheduling shutting down..."); + server.shutdown_asap = 1; +} + static void setupSigSegvAction(void) { struct sigaction act; @@ -10091,6 +11469,10 @@ static void setupSigSegvAction(void) { sigaction (SIGFPE, &act, NULL); sigaction (SIGILL, &act, NULL); sigaction (SIGBUS, &act, NULL); + + act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND; + act.sa_handler = sigtermHandler; + sigaction (SIGTERM, &act, NULL); return; }