X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/168ac5c6e3c9bf137fcebfd55948363564cbe434..d52e5888694cb65918f2b0d00691c198deb5643f:/redis.c diff --git a/redis.c b/redis.c index 03f90a02..f6a765da 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "1.3.7" +#define REDIS_VERSION "2.1.1" #include "fmacros.h" #include "config.h" @@ -37,8 +37,6 @@ #include #include #include -#define __USE_POSIX199309 -#define __USE_UNIX98 #include #ifdef HAVE_BACKTRACE @@ -59,6 +57,7 @@ #include #include #include +#include #include #include @@ -75,7 +74,9 @@ #include "zmalloc.h" /* total memory usage aware version of malloc/free */ #include "lzf.h" /* LZF compression library */ #include "pqsort.h" /* Partial qsort for SORT+LIMIT */ -#include "zipmap.h" +#include "zipmap.h" /* Compact dictionary-alike data structure */ +#include "ziplist.h" /* Compact list data structure */ +#include "sha1.h" /* SHA1 is used for DEBUG DIGEST */ /* Error codes */ #define REDIS_OK 0 @@ -91,7 +92,7 @@ #define REDIS_CONFIGLINE_MAX 1024 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */ #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */ -#define REDIS_EXPIRELOOKUPS_PER_CRON 10 /* try to expire 10 keys/loop */ +#define REDIS_EXPIRELOOKUPS_PER_CRON 10 /* lookup 10 expires per loop */ #define REDIS_MAX_WRITE_PER_EVENT (1024*64) #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */ @@ -111,6 +112,7 @@ config file and the server is using more than maxmemory bytes of memory. In short this commands are denied on low memory conditions. */ #define REDIS_CMD_DENYOOM 4 +#define REDIS_CMD_FORCE_REPLICATION 8 /* Force replication even if dirty is 0 */ /* Object types */ #define REDIS_STRING 0 @@ -118,17 +120,20 @@ #define REDIS_SET 2 #define REDIS_ZSET 3 #define REDIS_HASH 4 +#define REDIS_VMPOINTER 8 /* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object * is set to one of this fields for this object. */ -#define REDIS_ENCODING_RAW 0 /* Raw representation */ -#define REDIS_ENCODING_INT 1 /* Encoded as integer */ -#define REDIS_ENCODING_ZIPMAP 2 /* Encoded as zipmap */ -#define REDIS_ENCODING_HT 3 /* Encoded as an hash table */ +#define REDIS_ENCODING_RAW 0 /* Raw representation */ +#define REDIS_ENCODING_INT 1 /* Encoded as integer */ +#define REDIS_ENCODING_HT 2 /* Encoded as hash table */ +#define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */ +#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */ +#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ static char* strencoding[] = { - "raw", "int", "zipmap", "hashtable" + "raw", "int", "hashtable", "zipmap", "linkedlist", "ziplist" }; /* Object types only used for dumping to disk */ @@ -188,6 +193,7 @@ static char* strencoding[] = { #define REDIS_MULTI 8 /* This client is in a MULTI context */ #define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */ #define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ +#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */ /* Slave replication state - slave side */ #define REDIS_REPL_NONE 0 /* No active replication */ @@ -230,42 +236,57 @@ static char* strencoding[] = { #define APPENDFSYNC_ALWAYS 1 #define APPENDFSYNC_EVERYSEC 2 -/* Hashes related defaults */ +/* Zip structure related defaults */ #define REDIS_HASH_MAX_ZIPMAP_ENTRIES 64 #define REDIS_HASH_MAX_ZIPMAP_VALUE 512 +#define REDIS_LIST_MAX_ZIPLIST_ENTRIES 1024 +#define REDIS_LIST_MAX_ZIPLIST_VALUE 32 /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) +#define redisPanic(_e) _redisPanic(#_e,__FILE__,__LINE__),_exit(1) static void _redisAssert(char *estr, char *file, int line); +static void _redisPanic(char *msg, char *file, int line); /*================================= Data types ============================== */ /* A redis object, that is a type able to hold a string / list / set */ -/* The VM object structure */ -struct redisObjectVM { - off_t page; /* the page at witch the object is stored on disk */ - off_t usedpages; /* number of pages used on disk */ - time_t atime; /* Last access time */ -} vm; - /* The actual Redis Object */ typedef struct redisObject { - void *ptr; - unsigned char type; - unsigned char encoding; - unsigned char storage; /* If this object is a key, where is the value? - * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */ - unsigned char vtype; /* If this object is a key, and value is swapped out, - * this is the type of the swapped out object. */ + unsigned type:4; + unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */ + unsigned encoding:4; + unsigned lru:22; /* lru time (relative to server.lruclock) */ int refcount; - /* VM fields, this are only allocated if VM is active, otherwise the + void *ptr; + /* VM fields are only allocated if VM is active, otherwise the * object allocation function will just allocate * sizeof(redisObjct) minus sizeof(redisObjectVM), so using * Redis without VM active will not have any overhead. */ - struct redisObjectVM vm; } robj; +/* The VM pointer structure - identifies an object in the swap file. + * + * This object is stored in place of the value + * object in the main key->value hash table representing a database. + * Note that the first fields (type, storage) are the same as the redisObject + * structure so that vmPointer strucuters can be accessed even when casted + * as redisObject structures. + * + * This is useful as we don't know if a value object is or not on disk, but we + * are always able to read obj->storage to check this. For vmPointer + * structures "type" is set to REDIS_VMPOINTER (even if without this field + * is still possible to check the kind of object from the value of 'storage').*/ +typedef struct vmPointer { + unsigned type:4; + unsigned storage:2; /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */ + unsigned notused:26; + unsigned int vtype; /* type of the object stored in the swap file */ + off_t page; /* the page at witch the object is stored on disk */ + off_t usedpages; /* number of pages used on disk */ +} vmpointer; + /* Macro used to initalize a Redis object allocated on the stack. * Note that this macro is taken near the structure definition to make sure * we'll update it when the structure is changed, to avoid bugs like @@ -275,14 +296,15 @@ typedef struct redisObject { _var.type = REDIS_STRING; \ _var.encoding = REDIS_ENCODING_RAW; \ _var.ptr = _ptr; \ - if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \ + _var.storage = REDIS_VM_MEMORY; \ } while(0); typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ - dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */ + dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ dict *io_keys; /* Keys with clients waiting for VM I/O */ + dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@ -320,13 +342,16 @@ typedef struct redisClient { long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - robj **blockingkeys; /* The key we are waiting to terminate a blocking + robj **blocking_keys; /* The key we are waiting to terminate a blocking * operation such as BLPOP. Otherwise NULL. */ - int blockingkeysnum; /* Number of blocking keys */ + int blocking_keys_num; /* Number of blocking keys */ time_t blockingto; /* Blocking operation timeout. If UNIX current time * is >= blockingto then the operation timed out. */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ + list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ + dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ + list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ } redisClient; struct saveparam { @@ -339,8 +364,6 @@ struct redisServer { int port; int fd; redisDb *db; - dict *sharingpool; /* Poll used for object sharing */ - unsigned int sharingpoolsize; long long dirty; /* changes to DB from the last save */ list *clients; list *slaves, *monitors; @@ -362,6 +385,8 @@ struct redisServer { int daemonize; int appendonly; int appendfsync; + int no_appendfsync_on_rewrite; + int shutdown_asap; time_t lastfsync; int appendfd; int appendseldb; @@ -369,6 +394,7 @@ struct redisServer { pid_t bgsavechildpid; pid_t bgrewritechildpid; sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ + sds aofbuf; /* AOF buffer, written before entering the event loop */ struct saveparam *saveparams; int saveparamslen; char *logfile; @@ -376,8 +402,8 @@ struct redisServer { char *dbfilename; char *appendfilename; char *requirepass; - int shareobjects; int rdbcompression; + int activerehashing; /* Replication related */ int isslave; char *masterauth; @@ -400,9 +426,11 @@ struct redisServer { off_t vm_page_size; off_t vm_pages; unsigned long long vm_max_memory; - /* Hashes config */ + /* Zip structure config */ size_t hash_max_zipmap_entries; size_t hash_max_zipmap_value; + size_t list_max_ziplist_entries; + size_t list_max_ziplist_value; /* Virtual memory state */ FILE *vm_fp; int vm_fd; @@ -435,10 +463,22 @@ struct redisServer { unsigned long long vm_stats_swapped_objects; unsigned long long vm_stats_swapouts; unsigned long long vm_stats_swapins; + /* Pubsub */ + dict *pubsub_channels; /* Map channels to list of subscribed clients */ + list *pubsub_patterns; /* A list of pubsub_patterns */ + /* Misc */ FILE *devnull; + unsigned lruclock:22; /* clock incrementing every minute, for LRU */ + unsigned lruclock_padding:10; }; +typedef struct pubsubPattern { + redisClient *client; + robj *pattern; +} pubsubPattern; + typedef void redisCommandProc(redisClient *c); +typedef void redisVmPreloadProc(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); struct redisCommand { char *name; redisCommandProc *proc; @@ -447,7 +487,7 @@ struct redisCommand { /* Use a function to determine which keys need to be loaded * in the background prior to executing this command. Takes precedence * over vm_firstkey and others, ignored when NULL */ - redisCommandProc *vm_preload_proc; + redisVmPreloadProc *vm_preload_proc; /* What keys should be loaded in background when calling this command? */ int vm_firstkey; /* The first argument that's a key (0 = no keys) */ int vm_lastkey; /* THe last argument that's a key */ @@ -495,13 +535,17 @@ typedef struct zset { /* Our shared "common" objects */ +#define REDIS_SHARED_INTEGERS 10000 struct sharedObjectsStruct { - robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space, + robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *cnegone, *pong, *space, *colon, *nullbulk, *nullmultibulk, *queued, *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, *outofrangeerr, *plus, *select0, *select1, *select2, *select3, *select4, - *select5, *select6, *select7, *select8, *select9; + *select5, *select6, *select7, *select8, *select9, + *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3, + *mbulk4, *psubscribebulk, *punsubscribebulk, + *integers[REDIS_SHARED_INTEGERS]; } shared; /* Global vars that are actally used as constants. The following double @@ -518,6 +562,9 @@ typedef struct iojob { int type; /* Request type, REDIS_IOJOB_* */ redisDb *db;/* Redis database */ robj *key; /* This I/O request is about swapping this key */ + robj *id; /* Unique identifier of this job: + this is the object to swap for REDIS_IOREQ_*_SWAP, or the + vmpointer objct for REDIS_IOREQ_LOAD. */ robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */ off_t page; /* Swap page where to read/write the object */ @@ -527,6 +574,8 @@ typedef struct iojob { } iojob; /*================================ Prototypes =============================== */ +char *redisGitSHA1(void); +char *redisGitDirty(void); static void freeStringObject(robj *o); static void freeListObject(robj *o); @@ -542,16 +591,16 @@ static int rdbSaveBackground(char *filename); static robj *createStringObject(char *ptr, size_t len); static robj *dupStringObject(robj *o); static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); +static void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc); +static void flushAppendOnlyFile(void); static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc); static int syncWithMaster(void); -static robj *tryObjectSharing(robj *o); -static int tryObjectEncoding(robj *o); +static robj *tryObjectEncoding(robj *o); static robj *getDecodedObject(robj *o); static int removeExpire(redisDb *db, robj *key); static int expireIfNeeded(redisDb *db, robj *key); static int deleteIfVolatile(redisDb *db, robj *key); -static int deleteIfSwapped(redisDb *db, robj *key); -static int deleteKey(redisDb *db, robj *key); +static int dbDelete(redisDb *db, robj *key); static time_t getExpire(redisDb *db, robj *key); static int setExpire(redisDb *db, robj *key, time_t when); static void updateSlavesWaitingBgsave(int bgsaveerr); @@ -573,8 +622,8 @@ static void unblockClientWaitingData(redisClient *c); static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele); static void vmInit(void); static void vmMarkPagesFree(off_t page, off_t count); -static robj *vmLoadObject(robj *key); -static robj *vmPreviewObject(robj *key); +static robj *vmLoadObject(robj *o); +static robj *vmPreviewObject(robj *o); static int vmSwapOneObjectBlocking(void); static int vmSwapOneObjectThreaded(void); static int vmCanSwapOut(void); @@ -592,8 +641,9 @@ static robj *vmReadObjectFromSwap(off_t page, int type); static void waitEmptyIOJobsQueue(void); static void vmReopenSwapFile(void); static int vmFreePage(off_t page); -static void zunionInterBlockClientOnSwappedKeys(redisClient *c); -static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c); +static void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); +static void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); +static int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); static int dontWaitForSwappedKey(redisClient *c, robj *key); static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); @@ -601,12 +651,27 @@ static struct redisCommand *lookupCommand(char *name); static void call(redisClient *c, struct redisCommand *cmd); static void resetClient(redisClient *c); static void convertToRealHash(robj *o); +static void listTypeConvert(robj *o, int enc); +static int pubsubUnsubscribeAllChannels(redisClient *c, int notify); +static int pubsubUnsubscribeAllPatterns(redisClient *c, int notify); +static void freePubsubPattern(void *p); +static int listMatchPubsubPattern(void *a, void *b); +static int compareStringObjects(robj *a, robj *b); +static int equalStringObjects(robj *a, robj *b); +static void usage(); +static int rewriteAppendOnlyFileBackground(void); +static vmpointer *vmSwapObjectBlocking(robj *val); +static int prepareForShutdown(); +static void touchWatchedKey(redisDb *db, robj *key); +static void touchWatchedKeysOnFlush(int dbid); +static void unwatchAllKeys(redisClient *c); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); static void echoCommand(redisClient *c); static void setCommand(redisClient *c); static void setnxCommand(redisClient *c); +static void setexCommand(redisClient *c); static void getCommand(redisClient *c); static void delCommand(redisClient *c); static void existsCommand(redisClient *c); @@ -628,6 +693,9 @@ static void renameCommand(redisClient *c); static void renamenxCommand(redisClient *c); static void lpushCommand(redisClient *c); static void rpushCommand(redisClient *c); +static void lpushxCommand(redisClient *c); +static void rpushxCommand(redisClient *c); +static void linsertCommand(redisClient *c); static void lpopCommand(redisClient *c); static void rpopCommand(redisClient *c); static void llenCommand(redisClient *c); @@ -686,26 +754,39 @@ static void substrCommand(redisClient *c); static void zrankCommand(redisClient *c); static void zrevrankCommand(redisClient *c); static void hsetCommand(redisClient *c); +static void hsetnxCommand(redisClient *c); static void hgetCommand(redisClient *c); +static void hmsetCommand(redisClient *c); +static void hmgetCommand(redisClient *c); static void hdelCommand(redisClient *c); static void hlenCommand(redisClient *c); static void zremrangebyrankCommand(redisClient *c); -static void zunionCommand(redisClient *c); -static void zinterCommand(redisClient *c); +static void zunionstoreCommand(redisClient *c); +static void zinterstoreCommand(redisClient *c); static void hkeysCommand(redisClient *c); static void hvalsCommand(redisClient *c); static void hgetallCommand(redisClient *c); static void hexistsCommand(redisClient *c); static void configCommand(redisClient *c); +static void hincrbyCommand(redisClient *c); +static void subscribeCommand(redisClient *c); +static void unsubscribeCommand(redisClient *c); +static void psubscribeCommand(redisClient *c); +static void punsubscribeCommand(redisClient *c); +static void publishCommand(redisClient *c); +static void watchCommand(redisClient *c); +static void unwatchCommand(redisClient *c); /*================================= Globals ================================= */ /* Global vars */ static struct redisServer server; /* server global state */ -static struct redisCommand cmdTable[] = { +static struct redisCommand *commandTable; +static struct redisCommand readonlyCommandTable[] = { {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, + {"setex",setexCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, {"append",appendCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, {"substr",substrCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, {"del",delCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, @@ -715,6 +796,9 @@ static struct redisCommand cmdTable[] = { {"mget",mgetCommand,-2,REDIS_CMD_INLINE,NULL,1,-1,1}, {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"rpushx",rpushxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"lpushx",lpushxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"linsert",linsertCommand,5,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, {"rpop",rpopCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"lpop",lpopCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"brpop",brpopCommand,-3,REDIS_CMD_INLINE,NULL,1,1,1}, @@ -745,8 +829,8 @@ static struct redisCommand cmdTable[] = { {"zrem",zremCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zremrangebyrank",zremrangebyrankCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, - {"zunion",zunionCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, - {"zinter",zinterCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, + {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, + {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zcount",zcountCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, @@ -756,7 +840,11 @@ static struct redisCommand cmdTable[] = { {"zrank",zrankCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, {"zrevrank",zrevrankCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, {"hset",hsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"hsetnx",hsetnxCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, {"hget",hgetCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, + {"hmset",hmsetCommand,-4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"hmget",hmgetCommand,-3,REDIS_CMD_BULK,NULL,1,1,1}, + {"hincrby",hincrbyCommand,4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1}, {"hdel",hdelCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, {"hlen",hlenCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"hkeys",hkeysCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, @@ -787,7 +875,7 @@ static struct redisCommand cmdTable[] = { {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"type",typeCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"multi",multiCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"exec",execCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, + {"exec",execCommand,1,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,execBlockClientOnSwappedKeys,0,0,0}, {"discard",discardCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"sync",syncCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, @@ -799,11 +887,15 @@ static struct redisCommand cmdTable[] = { {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0}, {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0}, - {NULL,NULL,0,0,NULL,0,0,0} + {"subscribe",subscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, + {"unsubscribe",unsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0}, + {"psubscribe",psubscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, + {"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0}, + {"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0}, + {"watch",watchCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, + {"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0} }; -static void usage(); - /*============================ Utility functions ============================ */ /* Glob-style pattern matching. */ @@ -933,6 +1025,77 @@ static int stringmatch(const char *pattern, const char *string, int nocase) { return stringmatchlen(pattern,strlen(pattern),string,strlen(string),nocase); } +/* Convert a string representing an amount of memory into the number of + * bytes, so for instance memtoll("1Gi") will return 1073741824 that is + * (1024*1024*1024). + * + * On parsing error, if *err is not NULL, it's set to 1, otherwise it's + * set to 0 */ +static long long memtoll(const char *p, int *err) { + const char *u; + char buf[128]; + long mul; /* unit multiplier */ + long long val; + unsigned int digits; + + if (err) *err = 0; + /* Search the first non digit character. */ + u = p; + if (*u == '-') u++; + while(*u && isdigit(*u)) u++; + if (*u == '\0' || !strcasecmp(u,"b")) { + mul = 1; + } else if (!strcasecmp(u,"k")) { + mul = 1000; + } else if (!strcasecmp(u,"kb")) { + mul = 1024; + } else if (!strcasecmp(u,"m")) { + mul = 1000*1000; + } else if (!strcasecmp(u,"mb")) { + mul = 1024*1024; + } else if (!strcasecmp(u,"g")) { + mul = 1000L*1000*1000; + } else if (!strcasecmp(u,"gb")) { + mul = 1024L*1024*1024; + } else { + if (err) *err = 1; + mul = 1; + } + digits = u-p; + if (digits >= sizeof(buf)) { + if (err) *err = 1; + return LLONG_MAX; + } + memcpy(buf,p,digits); + buf[digits] = '\0'; + val = strtoll(buf,NULL,10); + return val*mul; +} + +/* Convert a long long into a string. Returns the number of + * characters needed to represent the number, that can be shorter if passed + * buffer length is not enough to store the whole number. */ +static int ll2string(char *s, size_t len, long long value) { + char buf[32], *p; + unsigned long long v; + size_t l; + + if (len == 0) return 0; + v = (value < 0) ? -value : value; + p = buf+31; /* point to the last character */ + do { + *p-- = '0'+(v%10); + v /= 10; + } while(v); + if (value < 0) *p-- = '-'; + p++; + l = 32-(p-buf); + if (l+1 > len) l = len-1; /* Make sure it fits, including the nul term */ + memcpy(s,p,l); + s[l] = '\0'; + return l; +} + static void redisLog(int level, const char *fmt, ...) { va_list ap; FILE *fp; @@ -976,7 +1139,7 @@ static void dictListDestructor(void *privdata, void *val) listRelease((list*)val); } -static int sdsDictKeyCompare(void *privdata, const void *key1, +static int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2) { int l1,l2; @@ -996,11 +1159,18 @@ static void dictRedisObjectDestructor(void *privdata, void *val) decrRefCount(val); } +static void dictSdsDestructor(void *privdata, void *val) +{ + DICT_NOTUSED(privdata); + + sdsfree(val); +} + static int dictObjKeyCompare(void *privdata, const void *key1, const void *key2) { const robj *o1 = key1, *o2 = key2; - return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr); + return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); } static unsigned int dictObjHash(const void *key) { @@ -1008,6 +1178,10 @@ static unsigned int dictObjHash(const void *key) { return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); } +static unsigned int dictSdsHash(const void *key) { + return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); +} + static int dictEncObjKeyCompare(void *privdata, const void *key1, const void *key2) { @@ -1015,12 +1189,12 @@ static int dictEncObjKeyCompare(void *privdata, const void *key1, int cmp; if (o1->encoding == REDIS_ENCODING_INT && - o2->encoding == REDIS_ENCODING_INT && - o1->ptr == o2->ptr) return 1; + o2->encoding == REDIS_ENCODING_INT) + return o1->ptr == o2->ptr; o1 = getDecodedObject(o1); o2 = getDecodedObject(o2); - cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr); + cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); decrRefCount(o1); decrRefCount(o2); return cmp; @@ -1036,7 +1210,7 @@ static unsigned int dictEncObjHash(const void *key) { char buf[32]; int len; - len = snprintf(buf,32,"%ld",(long)o->ptr); + len = ll2string(buf,32,(long)o->ptr); return dictGenHashFunction((unsigned char*)buf, len); } else { unsigned int hash; @@ -1049,7 +1223,7 @@ static unsigned int dictEncObjHash(const void *key) { } } -/* Sets type and expires */ +/* Sets type */ static dictType setDictType = { dictEncObjHash, /* hash function */ NULL, /* key dup */ @@ -1069,23 +1243,23 @@ static dictType zsetDictType = { dictVanillaFree /* val destructor of malloc(sizeof(double)) */ }; -/* Db->dict */ +/* Db->dict, keys are sds strings, vals are Redis objects. */ static dictType dbDictType = { - dictObjHash, /* hash function */ + dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictObjKeyCompare, /* key compare */ - dictRedisObjectDestructor, /* key destructor */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ dictRedisObjectDestructor /* val destructor */ }; /* Db->expires */ static dictType keyptrDictType = { - dictObjHash, /* hash function */ + dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictObjKeyCompare, /* key compare */ - dictRedisObjectDestructor, /* key destructor */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ NULL /* val destructor */ }; @@ -1139,7 +1313,9 @@ static void closeTimedoutClients(void) { if (server.maxidletime && !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ !(c->flags & REDIS_MASTER) && /* no timeout for masters */ - (now - c->lastinteraction > server.maxidletime)) + dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */ + listLength(c->pubsub_patterns) == 0 && + (now - c->lastinteraction > server.maxidletime)) { redisLog(REDIS_VERBOSE,"Closing idle client"); freeClient(c); @@ -1167,16 +1343,28 @@ static void tryResizeHashTables(void) { int j; for (j = 0; j < server.dbnum; j++) { - if (htNeedsResize(server.db[j].dict)) { - redisLog(REDIS_VERBOSE,"The hash table %d is too sparse, resize it...",j); + if (htNeedsResize(server.db[j].dict)) dictResize(server.db[j].dict); - redisLog(REDIS_VERBOSE,"Hash table %d resized.",j); - } if (htNeedsResize(server.db[j].expires)) dictResize(server.db[j].expires); } } +/* Our hash table implementation performs rehashing incrementally while + * we write/read from the hash table. Still if the server is idle, the hash + * table will use two tables for a long time. So we try to use 1 millisecond + * of CPU time at every serverCron() loop in order to rehash some key. */ +static void incrementallyRehash(void) { + int j; + + for (j = 0; j < server.dbnum; j++) { + if (dictIsRehashing(server.db[j].dict)) { + dictRehashMilliseconds(server.db[j].dict,1); + break; /* already used our millisecond for this loop... */ + } + } +} + /* A background saving child (BGSAVE) terminated its work. Handle this. */ void backgroundSaveDoneHandler(int statloc) { int exitcode = WEXITSTATUS(statloc); @@ -1191,7 +1379,7 @@ void backgroundSaveDoneHandler(int statloc) { redisLog(REDIS_WARNING, "Background saving error"); } else { redisLog(REDIS_WARNING, - "Background saving terminated by signal"); + "Background saving terminated by signal %d", WTERMSIG(statloc)); rdbRemoveTempFile(server.bgsavechildpid); } server.bgsavechildpid = -1; @@ -1240,7 +1428,7 @@ void backgroundRewriteDoneHandler(int statloc) { /* If append only is actually enabled... */ close(server.appendfd); server.appendfd = fd; - fsync(fd); + if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd); server.appendseldb = -1; /* Make sure it will issue SELECT */ redisLog(REDIS_NOTICE,"The new append only file was selected for future appends."); } else { @@ -1252,7 +1440,8 @@ void backgroundRewriteDoneHandler(int statloc) { redisLog(REDIS_WARNING, "Background append only file rewriting error"); } else { redisLog(REDIS_WARNING, - "Background append only file rewriting terminated by signal"); + "Background append only file rewriting terminated by signal %d", + WTERMSIG(statloc)); } cleanup: sdsfree(server.bgrewritebuf); @@ -1261,6 +1450,19 @@ cleanup: server.bgrewritechildpid = -1; } +/* This function is called once a background process of some kind terminates, + * as we want to avoid resizing the hash tables when there is a child in order + * to play well with copy-on-write (otherwise when a resize happens lots of + * memory pages are copied). The goal of this function is to update the ability + * for dict.c to resize the hash tables accordingly to the fact we have o not + * running childs. */ +static void updateDictResizePolicy(void) { + if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) + dictEnableResize(); + else + dictDisableResize(); +} + static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j, loops = server.cronloops++; REDIS_NOTUSED(eventLoop); @@ -1272,6 +1474,26 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD * in objects at every object access, and accuracy is not needed. * To access a global var is faster than calling time(NULL) */ server.unixtime = time(NULL); + /* We have just 21 bits per object for LRU information. + * So we use an (eventually wrapping) LRU clock with minutes resolution. + * + * When we need to select what object to swap, we compute the minimum + * time distance between the current lruclock and the object last access + * lruclock info. Even if clocks will wrap on overflow, there is + * the interesting property that we are sure that at least + * ABS(A-B) minutes passed between current time and timestamp B. + * + * This is not precise but we don't need at all precision, but just + * something statistically reasonable. + */ + server.lruclock = (time(NULL)/60)&((1<<21)-1); + + /* We received a SIGTERM, shutting down here in a safe way, as it is + * not ok doing so inside the signal handler. */ + if (server.shutdown_asap) { + if (prepareForShutdown() == REDIS_OK) exit(0); + redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); + } /* Show some info about non-empty databases */ for (j = 0; j < server.dbnum; j++) { @@ -1292,15 +1514,17 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD * if we resize the HT while there is the saving child at work actually * a lot of memory movements in the parent will cause a lot of pages * copied. */ - if (server.bgsavechildpid == -1 && !(loops % 10)) tryResizeHashTables(); + if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) { + if (!(loops % 10)) tryResizeHashTables(); + if (server.activerehashing) incrementallyRehash(); + } /* Show information about connected clients */ if (!(loops % 50)) { - redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects", + redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), - zmalloc_used_memory(), - dictSize(server.sharingpool)); + zmalloc_used_memory()); } /* Close connections of timedout clients */ @@ -1318,6 +1542,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD } else { backgroundRewriteDoneHandler(statloc); } + updateDictResizePolicy(); } } else { /* If there is not a background saving in progress check if @@ -1360,7 +1585,11 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD if ((de = dictGetRandomKey(db->expires)) == NULL) break; t = (time_t) dictGetEntryVal(de); if (now > t) { - deleteKey(db,dictGetEntryKey(de)); + sds key = dictGetEntryKey(de); + robj *keyobj = createStringObject(key,sdslen(key)); + + dbDelete(db,keyobj); + decrRefCount(keyobj); expired++; server.stat_expiredkeys++; } @@ -1399,6 +1628,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD redisLog(REDIS_NOTICE,"Connecting to MASTER..."); if (syncWithMaster() == REDIS_OK) { redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded"); + if (server.appendonly) rewriteAppendOnlyFileBackground(); } } return 100; @@ -1410,6 +1640,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD static void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); + /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; listNode *ln; @@ -1434,15 +1665,20 @@ static void beforeSleep(struct aeEventLoop *eventLoop) { processInputBuffer(c); } } + /* Write the AOF buffer on disk */ + flushAppendOnlyFile(); } static void createSharedObjects(void) { + int j; + shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n")); shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n")); shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n")); shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n")); shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n")); shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n")); + shared.cnegone = createObject(REDIS_STRING,sdsnew(":-1\r\n")); shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n")); shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n")); shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n")); @@ -1471,6 +1707,18 @@ static void createSharedObjects(void) { shared.select7 = createStringObject("select 7\r\n",10); shared.select8 = createStringObject("select 8\r\n",10); shared.select9 = createStringObject("select 9\r\n",10); + shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13); + shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14); + shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15); + shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); + shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); + shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); + shared.mbulk3 = createStringObject("*3\r\n",4); + shared.mbulk4 = createStringObject("*4\r\n",4); + for (j = 0; j < REDIS_SHARED_INTEGERS; j++) { + shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j); + shared.integers[j]->encoding = REDIS_ENCODING_INT; + } } static void appendServerSaveParams(time_t seconds, int changes) { @@ -1497,7 +1745,8 @@ static void initServerConfig() { server.glueoutputbuf = 1; server.daemonize = 0; server.appendonly = 0; - server.appendfsync = APPENDFSYNC_ALWAYS; + server.appendfsync = APPENDFSYNC_EVERYSEC; + server.no_appendfsync_on_rewrite = 0; server.lastfsync = time(NULL); server.appendfd = -1; server.appendseldb = -1; /* Make sure the first time will not match */ @@ -1505,9 +1754,8 @@ static void initServerConfig() { server.dbfilename = zstrdup("dump.rdb"); server.appendfilename = zstrdup("appendonly.aof"); server.requirepass = NULL; - server.shareobjects = 0; server.rdbcompression = 1; - server.sharingpoolsize = 1024; + server.activerehashing = 1; server.maxclients = 0; server.blpop_blocked_clients = 0; server.maxmemory = 0; @@ -1520,6 +1768,9 @@ static void initServerConfig() { server.vm_blocked_clients = 0; server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES; server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE; + server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES; + server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE; + server.shutdown_asap = 0; resetServerSaveParams(); @@ -1560,7 +1811,6 @@ static void initServer() { createSharedObjects(); server.el = aeCreateEventLoop(); server.db = zmalloc(sizeof(redisDb)*server.dbnum); - server.sharingpool = dictCreate(&setDictType,NULL); server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); if (server.fd == -1) { redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr); @@ -1569,15 +1819,21 @@ static void initServer() { for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); - server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL); + server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); + server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); if (server.vm_enabled) server.db[j].io_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; } + server.pubsub_channels = dictCreate(&keylistDictType,NULL); + server.pubsub_patterns = listCreate(); + listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); + listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); server.cronloops = 0; server.bgsavechildpid = -1; server.bgrewritechildpid = -1; server.bgrewritebuf = sdsempty(); + server.aofbuf = sdsempty(); server.lastsave = time(NULL); server.dirty = 0; server.stat_numcommands = 0; @@ -1627,15 +1883,12 @@ static void loadServerConfig(char *filename) { char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL; int linenum = 0; sds line = NULL; - char *errormsg = "Fatal error, can't open config file '%s'"; - char *errorbuf = zmalloc(sizeof(char)*(strlen(errormsg)+strlen(filename))); - sprintf(errorbuf, errormsg, filename); if (filename[0] == '-' && filename[1] == '\0') fp = stdin; else { if ((fp = fopen(filename,"r")) == NULL) { - redisLog(REDIS_WARNING, errorbuf); + redisLog(REDIS_WARNING, "Fatal error, can't open config file '%s'", filename); exit(1); } } @@ -1722,7 +1975,7 @@ static void loadServerConfig(char *filename) { } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) { server.maxclients = atoi(argv[1]); } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) { - server.maxmemory = strtoll(argv[1], NULL, 10); + server.maxmemory = memtoll(argv[1],NULL); } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) { server.masterhost = sdsnew(argv[1]); server.masterport = atoi(argv[2]); @@ -1733,18 +1986,13 @@ static void loadServerConfig(char *filename) { if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) { - if ((server.shareobjects = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) { if ((server.rdbcompression = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) { - server.sharingpoolsize = atoi(argv[1]); - if (server.sharingpoolsize < 1) { - err = "invalid object sharing pool size"; goto loaderr; + } else if (!strcasecmp(argv[0],"activerehashing") && argc == 2) { + if ((server.activerehashing = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; } } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) { if ((server.daemonize = yesnotoi(argv[1])) == -1) { @@ -1754,6 +2002,14 @@ static void loadServerConfig(char *filename) { if ((server.appendonly = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) { + zfree(server.appendfilename); + server.appendfilename = zstrdup(argv[1]); + } else if (!strcasecmp(argv[0],"no-appendfsync-on-rewrite") + && argc == 2) { + if ((server.no_appendfsync_on_rewrite= yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) { if (!strcasecmp(argv[1],"no")) { server.appendfsync = APPENDFSYNC_NO; @@ -1781,19 +2037,21 @@ static void loadServerConfig(char *filename) { zfree(server.vm_swap_file); server.vm_swap_file = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) { - server.vm_max_memory = strtoll(argv[1], NULL, 10); + server.vm_max_memory = memtoll(argv[1],NULL); } else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) { - server.vm_page_size = strtoll(argv[1], NULL, 10); + server.vm_page_size = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) { - server.vm_pages = strtoll(argv[1], NULL, 10); + server.vm_pages = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) { server.vm_max_threads = strtoll(argv[1], NULL, 10); } else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2){ - server.hash_max_zipmap_entries = strtol(argv[1], NULL, 10); + server.hash_max_zipmap_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2){ - server.hash_max_zipmap_value = strtol(argv[1], NULL, 10); - } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) { - server.vm_max_threads = strtoll(argv[1], NULL, 10); + server.hash_max_zipmap_value = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){ + server.list_max_ziplist_entries = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2){ + server.list_max_ziplist_value = memtoll(argv[1], NULL); } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -1837,6 +2095,15 @@ static void freeClient(redisClient *c) { if (c->flags & REDIS_BLOCKED) unblockClientWaitingData(c); + /* UNWATCH all the keys */ + unwatchAllKeys(c); + listRelease(c->watched_keys); + /* Unsubscribe from all the pubsub channels */ + pubsubUnsubscribeAllChannels(c,0); + pubsubUnsubscribeAllPatterns(c,0); + dictRelease(c->pubsub_channels); + listRelease(c->pubsub_patterns); + /* Obvious cleanup */ aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); listRelease(c->reply); @@ -1846,7 +2113,8 @@ static void freeClient(redisClient *c) { ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); - /* Remove from the list of clients waiting for swapped keys */ + /* Remove from the list of clients that are now ready to be restarted + * after waiting for swapped keys */ if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) { ln = listSearchKey(server.io_ready_clients,c); if (ln) { @@ -1854,12 +2122,13 @@ static void freeClient(redisClient *c) { server.vm_blocked_clients--; } } + /* Remove from the list of clients waiting for swapped keys */ while (server.vm_enabled && listLength(c->io_keys)) { ln = listFirst(c->io_keys); dontWaitForSwappedKey(c,ln->value); } listRelease(c->io_keys); - /* Other cleanup */ + /* Master/slave cleanup */ if (c->flags & REDIS_SLAVE) { if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) close(c->repldbfd); @@ -1872,6 +2141,7 @@ static void freeClient(redisClient *c) { server.master = NULL; server.replstate = REDIS_REPL_CONNECT; } + /* Release memory */ zfree(c->argv); zfree(c->mbargv); freeClientMultiState(c); @@ -1915,7 +2185,7 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) /* Use writev() if we have enough buffers to send */ if (!server.glueoutputbuf && - listLength(c->reply) > REDIS_WRITEV_THRESHOLD && + listLength(c->reply) > REDIS_WRITEV_THRESHOLD && !(c->flags & REDIS_MASTER)) { sendReplyToClientWritev(el, fd, privdata, mask); @@ -1993,7 +2263,7 @@ static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int o = listNodeValue(node); objlen = sdslen(o->ptr); - if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT) + if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT) break; if(ion == REDIS_WRITEV_IOVEC_COUNT) @@ -2041,7 +2311,7 @@ static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int } } - if (totwritten > 0) + if (totwritten > 0) c->lastinteraction = time(NULL); if (listLength(c->reply) == 0) { @@ -2050,13 +2320,29 @@ static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int } } +static int qsortRedisCommands(const void *r1, const void *r2) { + return strcasecmp( + ((struct redisCommand*)r1)->name, + ((struct redisCommand*)r2)->name); +} + +static void sortCommandTable() { + /* Copy and sort the read-only version of the command table */ + commandTable = (struct redisCommand*)malloc(sizeof(readonlyCommandTable)); + memcpy(commandTable,readonlyCommandTable,sizeof(readonlyCommandTable)); + qsort(commandTable, + sizeof(readonlyCommandTable)/sizeof(struct redisCommand), + sizeof(struct redisCommand),qsortRedisCommands); +} + static struct redisCommand *lookupCommand(char *name) { - int j = 0; - while(cmdTable[j].name != NULL) { - if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j]; - j++; - } - return NULL; + struct redisCommand tmp = {name,NULL,0,0,NULL,0,0,0}; + return bsearch( + &tmp, + commandTable, + sizeof(readonlyCommandTable)/sizeof(struct redisCommand), + sizeof(struct redisCommand), + qsortRedisCommands); } /* resetClient prepare the client to process the next command */ @@ -2072,12 +2358,15 @@ static void call(redisClient *c, struct redisCommand *cmd) { dirty = server.dirty; cmd->proc(c); - if (server.appendonly && server.dirty-dirty) + dirty = server.dirty-dirty; + + if (server.appendonly && dirty) feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc); - if (server.dirty-dirty && listLength(server.slaves)) + if ((dirty || cmd->flags & REDIS_CMD_FORCE_REPLICATION) && + listLength(server.slaves)) replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc); if (listLength(server.monitors)) - replicationFeedSlaves(server.monitors,c->db->id,c->argv,c->argc); + replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc); server.stat_numcommands++; } @@ -2215,15 +2504,9 @@ static int processCommand(redisClient *c) { return 1; } } - /* Let's try to share objects on the command arguments vector */ - if (server.shareobjects) { - int j; - for(j = 1; j < c->argc; j++) - c->argv[j] = tryObjectSharing(c->argv[j]); - } /* Let's try to encode the bulk object to save space. */ if (cmd->flags & REDIS_CMD_BULK) - tryObjectEncoding(c->argv[c->argc-1]); + c->argv[c->argc-1] = tryObjectEncoding(c->argv[c->argc-1]); /* Check if the user is authenticated */ if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { @@ -2241,13 +2524,26 @@ static int processCommand(redisClient *c) { return 1; } + /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ + if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0) + && + cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand && + cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) { + addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n")); + resetClient(c); + return 1; + } + /* Exec the command */ - if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) { + if (c->flags & REDIS_MULTI && + cmd->proc != execCommand && cmd->proc != discardCommand && + cmd->proc != multiCommand && cmd->proc != watchCommand) + { queueMultiCommand(c,cmd); addReply(c,shared.queued); } else { if (server.vm_enabled && server.vm_max_threads > 0 && - blockClientOnSwappedKeys(cmd,c)) return 1; + blockClientOnSwappedKeys(c,cmd)) return 1; call(c,cmd); } @@ -2328,6 +2624,64 @@ static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int arg if (outv != static_outv) zfree(outv); } +static sds sdscatrepr(sds s, char *p, size_t len) { + s = sdscatlen(s,"\"",1); + while(len--) { + switch(*p) { + case '\\': + case '"': + s = sdscatprintf(s,"\\%c",*p); + break; + case '\n': s = sdscatlen(s,"\\n",1); break; + case '\r': s = sdscatlen(s,"\\r",1); break; + case '\t': s = sdscatlen(s,"\\t",1); break; + case '\a': s = sdscatlen(s,"\\a",1); break; + case '\b': s = sdscatlen(s,"\\b",1); break; + default: + if (isprint(*p)) + s = sdscatprintf(s,"%c",*p); + else + s = sdscatprintf(s,"\\x%02x",(unsigned char)*p); + break; + } + p++; + } + return sdscatlen(s,"\"",1); +} + +static void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) { + listNode *ln; + listIter li; + int j; + sds cmdrepr = sdsnew("+"); + robj *cmdobj; + struct timeval tv; + + gettimeofday(&tv,NULL); + cmdrepr = sdscatprintf(cmdrepr,"%ld.%ld ",(long)tv.tv_sec,(long)tv.tv_usec); + if (dictid != 0) cmdrepr = sdscatprintf(cmdrepr,"(db %d) ", dictid); + + for (j = 0; j < argc; j++) { + if (argv[j]->encoding == REDIS_ENCODING_INT) { + cmdrepr = sdscatprintf(cmdrepr, "%ld", (long)argv[j]->ptr); + } else { + cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, + sdslen(argv[j]->ptr)); + } + if (j != argc-1) + cmdrepr = sdscatlen(cmdrepr," ",1); + } + cmdrepr = sdscatlen(cmdrepr,"\r\n",2); + cmdobj = createObject(REDIS_STRING,cmdrepr); + + listRewind(monitors,&li); + while((ln = listNext(&li))) { + redisClient *monitor = ln->value; + addReply(monitor,cmdobj); + } + decrRefCount(cmdobj); +} + static void processInputBuffer(redisClient *c) { again: /* Before to process the input buffer, make sure the client is not @@ -2345,7 +2699,7 @@ again: if (p) { sds query, *argv; int argc, j; - + query = c->querybuf; c->querybuf = sdsempty(); querylen = 1+(p-(query)); @@ -2452,6 +2806,10 @@ static void *dupClientReplyValue(void *o) { return o; } +static int listMatchObjects(void *a, void *b) { + return equalStringObjects(a,b); +} + static redisClient *createClient(int fd) { redisClient *c = zmalloc(sizeof(*c)); @@ -2475,10 +2833,15 @@ static redisClient *createClient(int fd) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->blockingkeys = NULL; - c->blockingkeysnum = 0; + c->blocking_keys = NULL; + c->blocking_keys_num = 0; c->io_keys = listCreate(); + c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); + c->pubsub_channels = dictCreate(&setDictType,NULL); + c->pubsub_patterns = listCreate(); + listSetFreeMethod(c->pubsub_patterns,decrRefCount); + listSetMatchMethod(c->pubsub_patterns,listMatchObjects); if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c) == AE_ERR) { freeClient(c); @@ -2517,19 +2880,22 @@ static void addReplyDouble(redisClient *c, double d) { (unsigned long) strlen(buf),buf)); } -static void addReplyLong(redisClient *c, long l) { +static void addReplyLongLong(redisClient *c, long long ll) { char buf[128]; size_t len; - if (l == 0) { + if (ll == 0) { addReply(c,shared.czero); return; - } else if (l == 1) { + } else if (ll == 1) { addReply(c,shared.cone); return; } - len = snprintf(buf,sizeof(buf),":%ld\r\n",l); - addReplySds(c,sdsnewlen(buf,len)); + buf[0] = ':'; + len = ll2string(buf+1,sizeof(buf)-1,ll); + buf[len+1] = '\r'; + buf[len+2] = '\n'; + addReplySds(c,sdsnewlen(buf,len+3)); } static void addReplyUlong(redisClient *c, unsigned long ul) { @@ -2548,7 +2914,8 @@ static void addReplyUlong(redisClient *c, unsigned long ul) { } static void addReplyBulkLen(redisClient *c, robj *obj) { - size_t len; + size_t len, intlen; + char buf[128]; if (obj->encoding == REDIS_ENCODING_RAW) { len = sdslen(obj->ptr); @@ -2565,7 +2932,11 @@ static void addReplyBulkLen(redisClient *c, robj *obj) { len++; } } - addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len)); + buf[0] = '$'; + intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len); + buf[intlen+1] = '\r'; + buf[intlen+2] = '\n'; + addReplySds(c,sdsnewlen(buf,intlen+3)); } static void addReplyBulk(redisClient *c, robj *obj) { @@ -2574,6 +2945,12 @@ static void addReplyBulk(redisClient *c, robj *obj) { addReply(c,shared.crlf); } +static void addReplyBulkSds(redisClient *c, sds s) { + robj *o = createStringObject(s, sdslen(s)); + addReplyBulk(c,o); + decrRefCount(o); +} + /* In the CONFIG command we need to add vanilla C string as bulk replies */ static void addReplyBulkCString(redisClient *c, char *s) { if (s == NULL) { @@ -2633,12 +3010,9 @@ static robj *createObject(int type, void *ptr) { listDelNode(server.objfreelist,head); if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex); } else { - if (server.vm_enabled) { + if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex); - o = zmalloc(sizeof(*o)); - } else { - o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM)); - } + o = zmalloc(sizeof(*o)); } o->type = type; o->encoding = REDIS_ENCODING_RAW; @@ -2646,10 +3020,10 @@ static robj *createObject(int type, void *ptr) { o->refcount = 1; if (server.vm_enabled) { /* Note that this code may run in the context of an I/O thread - * and accessing to server.unixtime in theory is an error + * and accessing server.lruclock in theory is an error * (no locks). But in practice this is safe, and even if we read - * garbage Redis will not fail, as it's just a statistical info */ - o->vm.atime = server.unixtime; + * garbage Redis will not fail. */ + o->lru = server.lruclock; o->storage = REDIS_VM_MEMORY; } return o; @@ -2659,6 +3033,23 @@ static robj *createStringObject(char *ptr, size_t len) { return createObject(REDIS_STRING,sdsnewlen(ptr,len)); } +static robj *createStringObjectFromLongLong(long long value) { + robj *o; + if (value >= 0 && value < REDIS_SHARED_INTEGERS) { + incrRefCount(shared.integers[value]); + o = shared.integers[value]; + } else { + if (value >= LONG_MIN && value <= LONG_MAX) { + o = createObject(REDIS_STRING, NULL); + o->encoding = REDIS_ENCODING_INT; + o->ptr = (void*)((long)value); + } else { + o = createObject(REDIS_STRING,sdsfromlonglong(value)); + } + } + return o; +} + static robj *dupStringObject(robj *o) { assert(o->encoding == REDIS_ENCODING_RAW); return createStringObject(o->ptr,sdslen(o->ptr)); @@ -2666,9 +3057,17 @@ static robj *dupStringObject(robj *o) { static robj *createListObject(void) { list *l = listCreate(); - + robj *o = createObject(REDIS_LIST,l); listSetFreeMethod(l,decrRefCount); - return createObject(REDIS_LIST,l); + o->encoding = REDIS_ENCODING_LINKEDLIST; + return o; +} + +static robj *createZiplistObject(void) { + unsigned char *zl = ziplistNew(); + robj *o = createObject(REDIS_LIST,zl); + o->encoding = REDIS_ENCODING_ZIPLIST; + return o; } static robj *createSetObject(void) { @@ -2701,7 +3100,16 @@ static void freeStringObject(robj *o) { } static void freeListObject(robj *o) { - listRelease((list*) o->ptr); + switch (o->encoding) { + case REDIS_ENCODING_LINKEDLIST: + listRelease((list*) o->ptr); + break; + case REDIS_ENCODING_ZIPLIST: + zfree(o->ptr); + break; + default: + redisPanic("Unknown list encoding type"); + } } static void freeSetObject(robj *o) { @@ -2725,50 +3133,50 @@ static void freeHashObject(robj *o) { zfree(o->ptr); break; default: - redisAssert(0); + redisPanic("Unknown hash encoding type"); break; } } static void incrRefCount(robj *o) { - redisAssert(!server.vm_enabled || o->storage == REDIS_VM_MEMORY); o->refcount++; } static void decrRefCount(void *obj) { robj *o = obj; - /* Object is a key of a swapped out value, or in the process of being - * loaded. */ + /* Object is a swapped out value, or in the process of being loaded. */ if (server.vm_enabled && (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING)) { - if (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING) { - redisAssert(o->refcount == 1); - } - if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(obj); - redisAssert(o->type == REDIS_STRING); - freeStringObject(o); - vmMarkPagesFree(o->vm.page,o->vm.usedpages); - pthread_mutex_lock(&server.obj_freelist_mutex); - if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX || - !listAddNodeHead(server.objfreelist,o)) - zfree(o); - pthread_mutex_unlock(&server.obj_freelist_mutex); + vmpointer *vp = obj; + if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(o); + vmMarkPagesFree(vp->page,vp->usedpages); server.vm_stats_swapped_objects--; + zfree(vp); return; } - /* Object is in memory, or in the process of being swapped out. */ + + if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0"); + /* Object is in memory, or in the process of being swapped out. + * + * If the object is being swapped out, abort the operation on + * decrRefCount even if the refcount does not drop to 0: the object + * is referenced at least two times, as value of the key AND as + * job->val in the iojob. So if we don't invalidate the iojob, when it is + * done but the relevant key was removed in the meantime, the + * complete jobs handler will not find the key about the job and the + * assert will fail. */ + if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING) + vmCancelThreadedIOJob(o); if (--(o->refcount) == 0) { - if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING) - vmCancelThreadedIOJob(obj); switch(o->type) { case REDIS_STRING: freeStringObject(o); break; case REDIS_LIST: freeListObject(o); break; case REDIS_SET: freeSetObject(o); break; case REDIS_ZSET: freeZsetObject(o); break; case REDIS_HASH: freeHashObject(o); break; - default: redisAssert(0); break; + default: redisPanic("Unknown object type"); break; } if (server.vm_enabled) pthread_mutex_lock(&server.obj_freelist_mutex); if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX || @@ -2778,63 +3186,6 @@ static void decrRefCount(void *obj) { } } -static robj *lookupKey(redisDb *db, robj *key) { - dictEntry *de = dictFind(db->dict,key); - if (de) { - robj *key = dictGetEntryKey(de); - robj *val = dictGetEntryVal(de); - - if (server.vm_enabled) { - if (key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) - { - /* If we were swapping the object out, stop it, this key - * was requested. */ - if (key->storage == REDIS_VM_SWAPPING) - vmCancelThreadedIOJob(key); - /* Update the access time of the key for the aging algorithm. */ - key->vm.atime = server.unixtime; - } else { - int notify = (key->storage == REDIS_VM_LOADING); - - /* Our value was swapped on disk. Bring it at home. */ - redisAssert(val == NULL); - val = vmLoadObject(key); - dictGetEntryVal(de) = val; - - /* Clients blocked by the VM subsystem may be waiting for - * this key... */ - if (notify) handleClientsBlockedOnSwappedKey(db,key); - } - } - return val; - } else { - return NULL; - } -} - -static robj *lookupKeyRead(redisDb *db, robj *key) { - expireIfNeeded(db,key); - return lookupKey(db,key); -} - -static robj *lookupKeyWrite(redisDb *db, robj *key) { - deleteIfVolatile(db,key); - return lookupKey(db,key); -} - -static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) { - robj *o = lookupKeyRead(c->db, key); - if (!o) addReply(c,reply); - return o; -} - -static robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) { - robj *o = lookupKeyWrite(c->db, key); - if (!o) addReply(c,reply); - return o; -} - static int checkType(redisClient *c, robj *o, int type) { if (o->type != type) { addReply(c,shared.wrongtypeerr); @@ -2843,66 +3194,6 @@ static int checkType(redisClient *c, robj *o, int type) { return 0; } -static int deleteKey(redisDb *db, robj *key) { - int retval; - - /* We need to protect key from destruction: after the first dictDelete() - * it may happen that 'key' is no longer valid if we don't increment - * it's count. This may happen when we get the object reference directly - * from the hash table with dictRandomKey() or dict iterators */ - incrRefCount(key); - if (dictSize(db->expires)) dictDelete(db->expires,key); - retval = dictDelete(db->dict,key); - decrRefCount(key); - - return retval == DICT_OK; -} - -/* Try to share an object against the shared objects pool */ -static robj *tryObjectSharing(robj *o) { - struct dictEntry *de; - unsigned long c; - - if (o == NULL || server.shareobjects == 0) return o; - - redisAssert(o->type == REDIS_STRING); - de = dictFind(server.sharingpool,o); - if (de) { - robj *shared = dictGetEntryKey(de); - - c = ((unsigned long) dictGetEntryVal(de))+1; - dictGetEntryVal(de) = (void*) c; - incrRefCount(shared); - decrRefCount(o); - return shared; - } else { - /* Here we are using a stream algorihtm: Every time an object is - * shared we increment its count, everytime there is a miss we - * recrement the counter of a random object. If this object reaches - * zero we remove the object and put the current object instead. */ - if (dictSize(server.sharingpool) >= - server.sharingpoolsize) { - de = dictGetRandomKey(server.sharingpool); - redisAssert(de != NULL); - c = ((unsigned long) dictGetEntryVal(de))-1; - dictGetEntryVal(de) = (void*) c; - if (c == 0) { - dictDelete(server.sharingpool,de->key); - } - } else { - c = 0; /* If the pool is empty we want to add this object */ - } - if (c == 0) { - int retval; - - retval = dictAdd(server.sharingpool,o,(void*)1); - redisAssert(retval == DICT_OK); - incrRefCount(o); - } - return o; - } -} - /* Check if the nul-terminated string 's' can be represented by a long * (that is, is a number that fits into long without any other space or * character before or after the digits). @@ -2913,10 +3204,10 @@ static int isStringRepresentableAsLong(sds s, long *longval) { char buf[32], *endptr; long value; int slen; - + value = strtol(s, &endptr, 10); if (endptr[0] != '\0') return REDIS_ERR; - slen = snprintf(buf,32,"%ld",value); + slen = ll2string(buf,32,value); /* If the number converted back into a string is not identical * then it's not possible to encode the string as integer */ @@ -2926,36 +3217,42 @@ static int isStringRepresentableAsLong(sds s, long *longval) { } /* Try to encode a string object in order to save space */ -static int tryObjectEncoding(robj *o) { +static robj *tryObjectEncoding(robj *o) { long value; sds s = o->ptr; if (o->encoding != REDIS_ENCODING_RAW) - return REDIS_ERR; /* Already encoded */ + return o; /* Already encoded */ - /* It's not save to encode shared objects: shared objects can be shared + /* It's not safe to encode shared objects: shared objects can be shared * everywhere in the "object space" of Redis. Encoded objects can only * appear as "values" (and not, for instance, as keys) */ - if (o->refcount > 1) return REDIS_ERR; + if (o->refcount > 1) return o; /* Currently we try to encode only strings */ redisAssert(o->type == REDIS_STRING); /* Check if we can represent this string as a long integer */ - if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR; + if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return o; /* Ok, this object can be encoded */ - o->encoding = REDIS_ENCODING_INT; - sdsfree(o->ptr); - o->ptr = (void*) value; - return REDIS_OK; + if (value >= 0 && value < REDIS_SHARED_INTEGERS) { + decrRefCount(o); + incrRefCount(shared.integers[value]); + return shared.integers[value]; + } else { + o->encoding = REDIS_ENCODING_INT; + sdsfree(o->ptr); + o->ptr = (void*) value; + return o; + } } /* Get a decoded version of an encoded object (returned as a new object). * If the object is already raw-encoded just increment the ref count. */ static robj *getDecodedObject(robj *o) { robj *dec; - + if (o->encoding == REDIS_ENCODING_RAW) { incrRefCount(o); return o; @@ -2963,17 +3260,17 @@ static robj *getDecodedObject(robj *o) { if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) { char buf[32]; - snprintf(buf,32,"%ld",(long)o->ptr); + ll2string(buf,32,(long)o->ptr); dec = createStringObject(buf,strlen(buf)); return dec; } else { - redisAssert(1 != 1); + redisPanic("Unknown encoding type"); } } /* Compare two string objects via strcmp() or alike. * Note that the objects may be integer-encoded. In such a case we - * use snprintf() to get a string representation of the numbers on the stack + * use ll2string() to get a string representation of the numbers on the stack * and compare the strings, it's much faster than calling getDecodedObject(). * * Important note: if objects are not integer encoded, but binary-safe strings, @@ -2986,14 +3283,14 @@ static int compareStringObjects(robj *a, robj *b) { if (a == b) return 0; if (a->encoding != REDIS_ENCODING_RAW) { - snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr); + ll2string(bufa,sizeof(bufa),(long) a->ptr); astr = bufa; bothsds = 0; } else { astr = a->ptr; } if (b->encoding != REDIS_ENCODING_RAW) { - snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr); + ll2string(bufb,sizeof(bufb),(long) b->ptr); bstr = bufb; bothsds = 0; } else { @@ -3002,6 +3299,18 @@ static int compareStringObjects(robj *a, robj *b) { return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr); } +/* Equal string objects return 1 if the two objects are the same from the + * point of view of a string comparison, otherwise 0 is returned. Note that + * this function is faster then checking for (compareStringObject(a,b) == 0) + * because it can perform some more optimization. */ +static int equalStringObjects(robj *a, robj *b) { + if (a->encoding != REDIS_ENCODING_RAW && b->encoding != REDIS_ENCODING_RAW){ + return a->ptr == b->ptr; + } else { + return compareStringObjects(a,b) == 0; + } +} + static size_t stringObjectLen(robj *o) { redisAssert(o->type == REDIS_STRING); if (o->encoding == REDIS_ENCODING_RAW) { @@ -3009,36 +3318,253 @@ static size_t stringObjectLen(robj *o) { } else { char buf[32]; - return snprintf(buf,32,"%ld",(long)o->ptr); + return ll2string(buf,32,(long)o->ptr); } } -/*============================ RDB saving/loading =========================== */ +static int getDoubleFromObject(robj *o, double *target) { + double value; + char *eptr; -static int rdbSaveType(FILE *fp, unsigned char type) { - if (fwrite(&type,1,1,fp) == 0) return -1; - return 0; + if (o == NULL) { + value = 0; + } else { + redisAssert(o->type == REDIS_STRING); + if (o->encoding == REDIS_ENCODING_RAW) { + value = strtod(o->ptr, &eptr); + if (eptr[0] != '\0') return REDIS_ERR; + } else if (o->encoding == REDIS_ENCODING_INT) { + value = (long)o->ptr; + } else { + redisPanic("Unknown string encoding"); + } + } + + *target = value; + return REDIS_OK; } -static int rdbSaveTime(FILE *fp, time_t t) { - int32_t t32 = (int32_t) t; - if (fwrite(&t32,4,1,fp) == 0) return -1; - return 0; +static int getDoubleFromObjectOrReply(redisClient *c, robj *o, double *target, const char *msg) { + double value; + if (getDoubleFromObject(o, &value) != REDIS_OK) { + if (msg != NULL) { + addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg)); + } else { + addReplySds(c, sdsnew("-ERR value is not a double\r\n")); + } + return REDIS_ERR; + } + + *target = value; + return REDIS_OK; } -/* check rdbLoadLen() comments for more info */ -static int rdbSaveLen(FILE *fp, uint32_t len) { - unsigned char buf[2]; +static int getLongLongFromObject(robj *o, long long *target) { + long long value; + char *eptr; - if (len < (1<<6)) { - /* Save a 6 bit len */ - buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6); - if (fwrite(buf,1,1,fp) == 0) return -1; - } else if (len < (1<<14)) { - /* Save a 14 bit len */ - buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6); - buf[1] = len&0xFF; - if (fwrite(buf,2,1,fp) == 0) return -1; + if (o == NULL) { + value = 0; + } else { + redisAssert(o->type == REDIS_STRING); + if (o->encoding == REDIS_ENCODING_RAW) { + value = strtoll(o->ptr, &eptr, 10); + if (eptr[0] != '\0') return REDIS_ERR; + } else if (o->encoding == REDIS_ENCODING_INT) { + value = (long)o->ptr; + } else { + redisPanic("Unknown string encoding"); + } + } + + *target = value; + return REDIS_OK; +} + +static int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, const char *msg) { + long long value; + if (getLongLongFromObject(o, &value) != REDIS_OK) { + if (msg != NULL) { + addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg)); + } else { + addReplySds(c, sdsnew("-ERR value is not an integer\r\n")); + } + return REDIS_ERR; + } + + *target = value; + return REDIS_OK; +} + +static int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg) { + long long value; + + if (getLongLongFromObjectOrReply(c, o, &value, msg) != REDIS_OK) return REDIS_ERR; + if (value < LONG_MIN || value > LONG_MAX) { + if (msg != NULL) { + addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg)); + } else { + addReplySds(c, sdsnew("-ERR value is out of range\r\n")); + } + return REDIS_ERR; + } + + *target = value; + return REDIS_OK; +} + +/* =========================== Keyspace access API ========================== */ + +static robj *lookupKey(redisDb *db, robj *key) { + dictEntry *de = dictFind(db->dict,key->ptr); + if (de) { + robj *val = dictGetEntryVal(de); + + if (server.vm_enabled) { + if (val->storage == REDIS_VM_MEMORY || + val->storage == REDIS_VM_SWAPPING) + { + /* If we were swapping the object out, cancel the operation */ + if (val->storage == REDIS_VM_SWAPPING) + vmCancelThreadedIOJob(val); + /* Update the access time for the aging algorithm. */ + val->lru = server.lruclock; + } else { + int notify = (val->storage == REDIS_VM_LOADING); + + /* Our value was swapped on disk. Bring it at home. */ + redisAssert(val->type == REDIS_VMPOINTER); + val = vmLoadObject(val); + dictGetEntryVal(de) = val; + + /* Clients blocked by the VM subsystem may be waiting for + * this key... */ + if (notify) handleClientsBlockedOnSwappedKey(db,key); + } + } + return val; + } else { + return NULL; + } +} + +static robj *lookupKeyRead(redisDb *db, robj *key) { + expireIfNeeded(db,key); + return lookupKey(db,key); +} + +static robj *lookupKeyWrite(redisDb *db, robj *key) { + deleteIfVolatile(db,key); + touchWatchedKey(db,key); + return lookupKey(db,key); +} + +static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) { + robj *o = lookupKeyRead(c->db, key); + if (!o) addReply(c,reply); + return o; +} + +static robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) { + robj *o = lookupKeyWrite(c->db, key); + if (!o) addReply(c,reply); + return o; +} + +/* Add the key to the DB. If the key already exists REDIS_ERR is returned, + * otherwise REDIS_OK is returned, and the caller should increment the + * refcount of 'val'. */ +static int dbAdd(redisDb *db, robj *key, robj *val) { + /* Perform a lookup before adding the key, as we need to copy the + * key value. */ + if (dictFind(db->dict, key->ptr) != NULL) { + return REDIS_ERR; + } else { + sds copy = sdsdup(key->ptr); + dictAdd(db->dict, copy, val); + return REDIS_OK; + } +} + +/* If the key does not exist, this is just like dbAdd(). Otherwise + * the value associated to the key is replaced with the new one. + * + * On update (key already existed) 0 is returned. Otherwise 1. */ +static int dbReplace(redisDb *db, robj *key, robj *val) { + if (dictFind(db->dict,key->ptr) == NULL) { + sds copy = sdsdup(key->ptr); + dictAdd(db->dict, copy, val); + return 1; + } else { + dictReplace(db->dict, key->ptr, val); + return 0; + } +} + +static int dbExists(redisDb *db, robj *key) { + return dictFind(db->dict,key->ptr) != NULL; +} + +/* Return a random key, in form of a Redis object. + * If there are no keys, NULL is returned. + * + * The function makes sure to return keys not already expired. */ +static robj *dbRandomKey(redisDb *db) { + struct dictEntry *de; + + while(1) { + sds key; + robj *keyobj; + + de = dictGetRandomKey(db->dict); + if (de == NULL) return NULL; + + key = dictGetEntryKey(de); + keyobj = createStringObject(key,sdslen(key)); + if (dictFind(db->expires,key)) { + if (expireIfNeeded(db,keyobj)) { + decrRefCount(keyobj); + continue; /* search for another key. This expired. */ + } + } + return keyobj; + } +} + +/* Delete a key, value, and associated expiration entry if any, from the DB */ +static int dbDelete(redisDb *db, robj *key) { + /* Deleting an entry from the expires dict will not free the sds of + * the key, because it is shared with the main dictionary. */ + if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); + return dictDelete(db->dict,key->ptr) == DICT_OK; +} + +/*============================ RDB saving/loading =========================== */ + +static int rdbSaveType(FILE *fp, unsigned char type) { + if (fwrite(&type,1,1,fp) == 0) return -1; + return 0; +} + +static int rdbSaveTime(FILE *fp, time_t t) { + int32_t t32 = (int32_t) t; + if (fwrite(&t32,4,1,fp) == 0) return -1; + return 0; +} + +/* check rdbLoadLen() comments for more info */ +static int rdbSaveLen(FILE *fp, uint32_t len) { + unsigned char buf[2]; + + if (len < (1<<6)) { + /* Save a 6 bit len */ + buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6); + if (fwrite(buf,1,1,fp) == 0) return -1; + } else if (len < (1<<14)) { + /* Save a 14 bit len */ + buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6); + buf[1] = len&0xFF; + if (fwrite(buf,2,1,fp) == 0) return -1; } else { /* Save a 32 bit len */ buf[0] = (REDIS_RDB_32BITLEN<<6); @@ -3049,22 +3575,12 @@ static int rdbSaveLen(FILE *fp, uint32_t len) { return 0; } -/* String objects in the form "2391" "-100" without any space and with a - * range of values that can fit in an 8, 16 or 32 bit signed value can be - * encoded as integers to save space */ -static int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) { - long long value; - char *endptr, buf[32]; - - /* Check if it's possible to encode this value as a number */ - value = strtoll(s, &endptr, 10); - if (endptr[0] != '\0') return 0; - snprintf(buf,32,"%lld",value); - - /* If the number converted back into a string is not identical - * then it's not possible to encode the string as integer */ - if (strlen(buf) != len || memcmp(buf,s,len)) return 0; - +/* Encode 'value' as an integer if possible (if integer will fit the + * supported range). If the function sucessful encoded the integer + * then the (up to 5 bytes) encoded representation is written in the + * string pointed by 'enc' and the length is returned. Otherwise + * 0 is returned. */ +static int rdbEncodeInteger(long long value, unsigned char *enc) { /* Finally check if it fits in our ranges */ if (value >= -(1<<7) && value <= (1<<7)-1) { enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8; @@ -3087,6 +3603,25 @@ static int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) { } } +/* String objects in the form "2391" "-100" without any space and with a + * range of values that can fit in an 8, 16 or 32 bit signed value can be + * encoded as integers to save space */ +static int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) { + long long value; + char *endptr, buf[32]; + + /* Check if it's possible to encode this value as a number */ + value = strtoll(s, &endptr, 10); + if (endptr[0] != '\0') return 0; + ll2string(buf,32,value); + + /* If the number converted back into a string is not identical + * then it's not possible to encode the string as integer */ + if (strlen(buf) != len || memcmp(buf,s,len)) return 0; + + return rdbEncodeInteger(value,enc); +} + static int rdbSaveLzfStringObject(FILE *fp, unsigned char *s, size_t len) { size_t comprlen, outlen; unsigned char byte; @@ -3146,23 +3681,32 @@ static int rdbSaveRawString(FILE *fp, unsigned char *s, size_t len) { return 0; } +/* Save a long long value as either an encoded string or a string. */ +static int rdbSaveLongLongAsStringObject(FILE *fp, long long value) { + unsigned char buf[32]; + int enclen = rdbEncodeInteger(value,buf); + if (enclen > 0) { + if (fwrite(buf,enclen,1,fp) == 0) return -1; + } else { + /* Encode as string */ + enclen = ll2string((char*)buf,32,value); + redisAssert(enclen < 32); + if (rdbSaveLen(fp,enclen) == -1) return -1; + if (fwrite(buf,enclen,1,fp) == 0) return -1; + } + return 0; +} + /* Like rdbSaveStringObjectRaw() but handle encoded objects */ static int rdbSaveStringObject(FILE *fp, robj *obj) { - int retval; - - /* Avoid incr/decr ref count business when possible. - * This plays well with copy-on-write given that we are probably - * in a child process (BGSAVE). Also this makes sure key objects - * of swapped objects are not incRefCount-ed (an assert does not allow - * this in order to avoid bugs) */ - if (obj->encoding != REDIS_ENCODING_RAW) { - obj = getDecodedObject(obj); - retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); - decrRefCount(obj); + /* Avoid to decode the object, then encode it again, if the + * object is alrady integer encoded. */ + if (obj->encoding == REDIS_ENCODING_INT) { + return rdbSaveLongLongAsStringObject(fp,(long)obj->ptr); } else { - retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); + redisAssert(obj->encoding == REDIS_ENCODING_RAW); + return rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); } - return retval; } /* Save a double value. Doubles are saved as strings prefixed by an unsigned @@ -3184,7 +3728,23 @@ static int rdbSaveDoubleValue(FILE *fp, double val) { len = 1; buf[0] = (val < 0) ? 255 : 254; } else { - snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val); +#if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL) + /* Check if the float is in a safe range to be casted into a + * long long. We are assuming that long long is 64 bit here. + * Also we are assuming that there are no implementations around where + * double has precision < 52 bit. + * + * Under this assumptions we test if a double is inside an interval + * where casting to long long is safe. Then using two castings we + * make sure the decimal part is zero. If all this is true we use + * integer printing function that is much faster. */ + double min = -4503599627370495; /* (2^52)-1 */ + double max = 4503599627370496; /* -(2^52) */ + if (val > min && val < max && val == ((double)((long long)val))) + ll2string((char*)buf+1,sizeof(buf),(long long)val); + else +#endif + snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val); buf[0] = strlen((char*)buf+1); len = buf[0]+1; } @@ -3199,16 +3759,37 @@ static int rdbSaveObject(FILE *fp, robj *o) { if (rdbSaveStringObject(fp,o) == -1) return -1; } else if (o->type == REDIS_LIST) { /* Save a list value */ - list *list = o->ptr; - listIter li; - listNode *ln; - - if (rdbSaveLen(fp,listLength(list)) == -1) return -1; - listRewind(list,&li); - while((ln = listNext(&li))) { - robj *eleobj = listNodeValue(ln); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + if (rdbSaveLen(fp,ziplistLen(o->ptr)) == -1) return -1; + p = ziplistIndex(o->ptr,0); + while(ziplistGet(p,&vstr,&vlen,&vlong)) { + if (vstr) { + if (rdbSaveRawString(fp,vstr,vlen) == -1) + return -1; + } else { + if (rdbSaveLongLongAsStringObject(fp,vlong) == -1) + return -1; + } + p = ziplistNext(o->ptr,p); + } + } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { + list *list = o->ptr; + listIter li; + listNode *ln; - if (rdbSaveStringObject(fp,eleobj) == -1) return -1; + if (rdbSaveLen(fp,listLength(list)) == -1) return -1; + listRewind(list,&li); + while((ln = listNext(&li))) { + robj *eleobj = listNodeValue(ln); + if (rdbSaveStringObject(fp,eleobj) == -1) return -1; + } + } else { + redisPanic("Unknown list encoding"); } } else if (o->type == REDIS_SET) { /* Save a set value */ @@ -3266,7 +3847,7 @@ static int rdbSaveObject(FILE *fp, robj *o) { dictReleaseIterator(di); } } else { - redisAssert(0); + redisPanic("Unknown object type"); } return 0; } @@ -3285,7 +3866,7 @@ static off_t rdbSavedObjectLen(robj *o, FILE *fp) { /* Return the number of pages required to save this object in the swap file */ static off_t rdbSavedObjectPages(robj *o, FILE *fp) { off_t bytes = rdbSavedObjectLen(o,fp); - + return (bytes+(server.vm_page_size-1))/server.vm_page_size; } @@ -3327,9 +3908,12 @@ static int rdbSave(char *filename) { /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - robj *key = dictGetEntryKey(de); - robj *o = dictGetEntryVal(de); - time_t expiretime = getExpire(db,key); + sds keystr = dictGetEntryKey(de); + robj key, *o = dictGetEntryVal(de); + time_t expiretime; + + initStaticStringObject(key,keystr); + expiretime = getExpire(db,&key); /* Save the expire time */ if (expiretime != -1) { @@ -3340,20 +3924,20 @@ static int rdbSave(char *filename) { } /* Save the key and associated value. This requires special * handling if the value is swapped out. */ - if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) { + if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || + o->storage == REDIS_VM_SWAPPING) { /* Save type, key, value */ if (rdbSaveType(fp,o->type) == -1) goto werr; - if (rdbSaveStringObject(fp,key) == -1) goto werr; + if (rdbSaveStringObject(fp,&key) == -1) goto werr; if (rdbSaveObject(fp,o) == -1) goto werr; } else { /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */ robj *po; /* Get a preview of the object in memory */ - po = vmPreviewObject(key); + po = vmPreviewObject(o); /* Save type, key, value */ - if (rdbSaveType(fp,key->vtype) == -1) goto werr; - if (rdbSaveStringObject(fp,key) == -1) goto werr; + if (rdbSaveType(fp,po->type) == -1) goto werr; + if (rdbSaveStringObject(fp,&key) == -1) goto werr; if (rdbSaveObject(fp,po) == -1) goto werr; /* Remove the loaded object from memory */ decrRefCount(po); @@ -3368,7 +3952,7 @@ static int rdbSave(char *filename) { fflush(fp); fsync(fileno(fp)); fclose(fp); - + /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { @@ -3412,6 +3996,7 @@ static int rdbSaveBackground(char *filename) { } redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid); server.bgsavechildpid = childpid; + updateDictResizePolicy(); return REDIS_OK; } return REDIS_OK; /* unreached */ @@ -3467,7 +4052,11 @@ static uint32_t rdbLoadLen(FILE *fp, int *isencoded) { } } -static robj *rdbLoadIntegerObject(FILE *fp, int enctype) { +/* Load an integer-encoded object from file 'fp', with the specified + * encoding type 'enctype'. If encode is true the function may return + * an integer-encoded object as reply, otherwise the returned object + * will always be encoded as a raw string. */ +static robj *rdbLoadIntegerObject(FILE *fp, int enctype, int encode) { unsigned char enc[4]; long long val; @@ -3486,9 +4075,12 @@ static robj *rdbLoadIntegerObject(FILE *fp, int enctype) { val = (int32_t)v; } else { val = 0; /* anti-warning */ - redisAssert(0); + redisPanic("Unknown RDB integer encoding type"); } - return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val)); + if (encode) + return createStringObjectFromLongLong(val); + else + return createObject(REDIS_STRING,sdsfromlonglong(val)); } static robj *rdbLoadLzfStringObject(FILE*fp) { @@ -3510,7 +4102,7 @@ err: return NULL; } -static robj *rdbLoadStringObject(FILE*fp) { +static robj *rdbGenericLoadStringObject(FILE*fp, int encode) { int isencoded; uint32_t len; sds val; @@ -3521,11 +4113,11 @@ static robj *rdbLoadStringObject(FILE*fp) { case REDIS_RDB_ENC_INT8: case REDIS_RDB_ENC_INT16: case REDIS_RDB_ENC_INT32: - return tryObjectSharing(rdbLoadIntegerObject(fp,len)); + return rdbLoadIntegerObject(fp,len,encode); case REDIS_RDB_ENC_LZF: - return tryObjectSharing(rdbLoadLzfStringObject(fp)); + return rdbLoadLzfStringObject(fp); default: - redisAssert(0); + redisPanic("Unknown RDB encoding type"); } } @@ -3535,7 +4127,15 @@ static robj *rdbLoadStringObject(FILE*fp) { sdsfree(val); return NULL; } - return tryObjectSharing(createObject(REDIS_STRING,val)); + return createObject(REDIS_STRING,val); +} + +static robj *rdbLoadStringObject(FILE *fp) { + return rdbGenericLoadStringObject(fp,0); +} + +static robj *rdbLoadEncodedStringObject(FILE *fp) { + return rdbGenericLoadStringObject(fp,1); } /* For information about double serialization check rdbSaveDoubleValue() */ @@ -3559,35 +4159,60 @@ static int rdbLoadDoubleValue(FILE *fp, double *val) { /* Load a Redis object of the specified type from the specified file. * On success a newly allocated object is returned, otherwise NULL. */ static robj *rdbLoadObject(int type, FILE *fp) { - robj *o; + robj *o, *ele, *dec; + size_t len; redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp)); if (type == REDIS_STRING) { /* Read string value */ - if ((o = rdbLoadStringObject(fp)) == NULL) return NULL; - tryObjectEncoding(o); - } else if (type == REDIS_LIST || type == REDIS_SET) { - /* Read list/set value */ - uint32_t listlen; + if ((o = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + o = tryObjectEncoding(o); + } else if (type == REDIS_LIST) { + /* Read list value */ + if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; + + /* Use a real list when there are too many entries */ + if (len > server.list_max_ziplist_entries) { + o = createListObject(); + } else { + o = createZiplistObject(); + } - if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; - o = (type == REDIS_LIST) ? createListObject() : createSetObject(); - /* It's faster to expand the dict to the right size asap in order - * to avoid rehashing */ - if (type == REDIS_SET && listlen > DICT_HT_INITIAL_SIZE) - dictExpand(o->ptr,listlen); - /* Load every single element of the list/set */ - while(listlen--) { - robj *ele; + /* Load every single element of the list */ + while(len--) { + if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + + /* If we are using a ziplist and the value is too big, convert + * the object to a real list. */ + if (o->encoding == REDIS_ENCODING_ZIPLIST && + ele->encoding == REDIS_ENCODING_RAW && + sdslen(ele->ptr) > server.list_max_ziplist_value) + listTypeConvert(o,REDIS_ENCODING_LINKEDLIST); - if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL; - tryObjectEncoding(ele); - if (type == REDIS_LIST) { - listAddNodeTail((list*)o->ptr,ele); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + dec = getDecodedObject(ele); + o->ptr = ziplistPush(o->ptr,dec->ptr,sdslen(dec->ptr),REDIS_TAIL); + decrRefCount(dec); + decrRefCount(ele); } else { - dictAdd((dict*)o->ptr,ele,NULL); + ele = tryObjectEncoding(ele); + listAddNodeTail(o->ptr,ele); } } + } else if (type == REDIS_SET) { + /* Read list/set value */ + if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; + o = createSetObject(); + /* It's faster to expand the dict to the right size asap in order + * to avoid rehashing */ + if (len > DICT_HT_INITIAL_SIZE) + dictExpand(o->ptr,len); + /* Load every single element of the list/set */ + while(len--) { + if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + ele = tryObjectEncoding(ele); + dictAdd((dict*)o->ptr,ele,NULL); + } } else if (type == REDIS_ZSET) { /* Read list/set value */ size_t zsetlen; @@ -3601,8 +4226,8 @@ static robj *rdbLoadObject(int type, FILE *fp) { robj *ele; double *score = zmalloc(sizeof(double)); - if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL; - tryObjectEncoding(ele); + if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + ele = tryObjectEncoding(ele); if (rdbLoadDoubleValue(fp,score) == -1) return NULL; dictAdd(zs->dict,ele,score); zslInsert(zs->zsl,*score,ele); @@ -3621,47 +4246,53 @@ static robj *rdbLoadObject(int type, FILE *fp) { while(hashlen--) { robj *key, *val; - if ((key = rdbLoadStringObject(fp)) == NULL) return NULL; - if ((val = rdbLoadStringObject(fp)) == NULL) return NULL; + if ((key = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + if ((val = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; /* If we are using a zipmap and there are too big values * the object is converted to real hash table encoding. */ if (o->encoding != REDIS_ENCODING_HT && - (sdslen(key->ptr) > server.hash_max_zipmap_value || - sdslen(val->ptr) > server.hash_max_zipmap_value)) + ((key->encoding == REDIS_ENCODING_RAW && + sdslen(key->ptr) > server.hash_max_zipmap_value) || + (val->encoding == REDIS_ENCODING_RAW && + sdslen(val->ptr) > server.hash_max_zipmap_value))) { convertToRealHash(o); } if (o->encoding == REDIS_ENCODING_ZIPMAP) { unsigned char *zm = o->ptr; + robj *deckey, *decval; - zm = zipmapSet(zm,key->ptr,sdslen(key->ptr), - val->ptr,sdslen(val->ptr),NULL); + /* We need raw string objects to add them to the zipmap */ + deckey = getDecodedObject(key); + decval = getDecodedObject(val); + zm = zipmapSet(zm,deckey->ptr,sdslen(deckey->ptr), + decval->ptr,sdslen(decval->ptr),NULL); o->ptr = zm; + decrRefCount(deckey); + decrRefCount(decval); decrRefCount(key); decrRefCount(val); } else { - tryObjectEncoding(key); - tryObjectEncoding(val); + key = tryObjectEncoding(key); + val = tryObjectEncoding(val); dictAdd((dict*)o->ptr,key,val); } } } else { - redisAssert(0); + redisPanic("Unknown object type"); } return o; } static int rdbLoad(char *filename) { FILE *fp; - robj *keyobj = NULL; uint32_t dbid; int type, retval, rdbver; - dict *d = server.db[0].dict; + int swap_all_values = 0; redisDb *db = server.db+0; char buf[1024]; - time_t expiretime = -1, now = time(NULL); - long long loadedkeys = 0; + time_t expiretime, now = time(NULL); fp = fopen(filename,"r"); if (!fp) return REDIS_ERR; @@ -3679,8 +4310,10 @@ static int rdbLoad(char *filename) { return REDIS_ERR; } while(1) { - robj *o; + robj *key, *val; + int force_swapout; + expiretime = -1; /* Read type. */ if ((type = rdbLoadType(fp)) == -1) goto eoferr; if (type == REDIS_EXPIRETIME) { @@ -3698,106 +4331,180 @@ static int rdbLoad(char *filename) { exit(1); } db = server.db+dbid; - d = db->dict; continue; } /* Read key */ - if ((keyobj = rdbLoadStringObject(fp)) == NULL) goto eoferr; + if ((key = rdbLoadStringObject(fp)) == NULL) goto eoferr; /* Read value */ - if ((o = rdbLoadObject(type,fp)) == NULL) goto eoferr; + if ((val = rdbLoadObject(type,fp)) == NULL) goto eoferr; + /* Check if the key already expired */ + if (expiretime != -1 && expiretime < now) { + decrRefCount(key); + decrRefCount(val); + continue; + } /* Add the new object in the hash table */ - retval = dictAdd(d,keyobj,o); - if (retval == DICT_ERR) { - redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr); + retval = dbAdd(db,key,val); + if (retval == REDIS_ERR) { + redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", key->ptr); exit(1); } /* Set the expire time if needed */ - if (expiretime != -1) { - setExpire(db,keyobj,expiretime); - /* Delete this key if already expired */ - if (expiretime < now) deleteKey(db,keyobj); - expiretime = -1; - } - keyobj = o = NULL; + if (expiretime != -1) setExpire(db,key,expiretime); + /* Handle swapping while loading big datasets when VM is on */ - loadedkeys++; - if (server.vm_enabled && (loadedkeys % 5000) == 0) { + + /* If we detecter we are hopeless about fitting something in memory + * we just swap every new key on disk. Directly... + * Note that's important to check for this condition before resorting + * to random sampling, otherwise we may try to swap already + * swapped keys. */ + if (swap_all_values) { + dictEntry *de = dictFind(db->dict,key->ptr); + + /* de may be NULL since the key already expired */ + if (de) { + vmpointer *vp; + val = dictGetEntryVal(de); + + if (val->refcount == 1 && + (vp = vmSwapObjectBlocking(val)) != NULL) + dictGetEntryVal(de) = vp; + } + decrRefCount(key); + continue; + } + decrRefCount(key); + + /* Flush data on disk once 32 MB of additional RAM are used... */ + force_swapout = 0; + if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) + force_swapout = 1; + + /* If we have still some hope of having some value fitting memory + * then we try random sampling. */ + if (!swap_all_values && server.vm_enabled && force_swapout) { while (zmalloc_used_memory() > server.vm_max_memory) { if (vmSwapOneObjectBlocking() == REDIS_ERR) break; } + if (zmalloc_used_memory() > server.vm_max_memory) + swap_all_values = 1; /* We are already using too much mem */ } } fclose(fp); return REDIS_OK; eoferr: /* unexpected end of file is handled here with a fatal exit */ - if (keyobj) decrRefCount(keyobj); redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); exit(1); return REDIS_ERR; /* Just to avoid warning */ } -/*================================== Commands =============================== */ - -static void authCommand(redisClient *c) { - if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) { - c->authenticated = 1; - addReply(c,shared.ok); - } else { - c->authenticated = 0; - addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n")); +/*================================== Shutdown =============================== */ +static int prepareForShutdown() { + redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); + /* Kill the saving child if there is a background saving in progress. + We want to avoid race conditions, for instance our saving child may + overwrite the synchronous saving did by SHUTDOWN. */ + if (server.bgsavechildpid != -1) { + redisLog(REDIS_WARNING,"There is a live saving child. Killing it!"); + kill(server.bgsavechildpid,SIGKILL); + rdbRemoveTempFile(server.bgsavechildpid); } -} - -static void pingCommand(redisClient *c) { - addReply(c,shared.pong); -} - -static void echoCommand(redisClient *c) { - addReplyBulk(c,c->argv[1]); -} - -/*=================================== Strings =============================== */ - -static void setGenericCommand(redisClient *c, int nx) { + if (server.appendonly) { + /* Append only file: fsync() the AOF and exit */ + aof_fsync(server.appendfd); + if (server.vm_enabled) unlink(server.vm_swap_file); + } else { + /* Snapshotting. Perform a SYNC SAVE and exit */ + if (rdbSave(server.dbfilename) == REDIS_OK) { + if (server.daemonize) + unlink(server.pidfile); + redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); + } else { + /* Ooops.. error saving! The best we can do is to continue + * operating. Note that if there was a background saving process, + * in the next cron() Redis will be notified that the background + * saving aborted, handling special stuff like slaves pending for + * synchronization... */ + redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit"); + return REDIS_ERR; + } + } + redisLog(REDIS_WARNING,"Server exit now, bye bye..."); + return REDIS_OK; +} + +/*================================== Commands =============================== */ + +static void authCommand(redisClient *c) { + if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) { + c->authenticated = 1; + addReply(c,shared.ok); + } else { + c->authenticated = 0; + addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n")); + } +} + +static void pingCommand(redisClient *c) { + addReply(c,shared.pong); +} + +static void echoCommand(redisClient *c) { + addReplyBulk(c,c->argv[1]); +} + +/*=================================== Strings =============================== */ + +static void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expire) { int retval; + long seconds = 0; /* initialized to avoid an harmness warning */ + + if (expire) { + if (getLongFromObjectOrReply(c, expire, &seconds, NULL) != REDIS_OK) + return; + if (seconds <= 0) { + addReplySds(c,sdsnew("-ERR invalid expire time in SETEX\r\n")); + return; + } + } - if (nx) deleteIfVolatile(c->db,c->argv[1]); - retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); - if (retval == DICT_ERR) { + touchWatchedKey(c->db,key); + if (nx) deleteIfVolatile(c->db,key); + retval = dbAdd(c->db,key,val); + if (retval == REDIS_ERR) { if (!nx) { - /* If the key is about a swapped value, we want a new key object - * to overwrite the old. So we delete the old key in the database. - * This will also make sure that swap pages about the old object - * will be marked as free. */ - if (server.vm_enabled && deleteIfSwapped(c->db,c->argv[1])) - incrRefCount(c->argv[1]); - dictReplace(c->db->dict,c->argv[1],c->argv[2]); - incrRefCount(c->argv[2]); + dbReplace(c->db,key,val); + incrRefCount(val); } else { addReply(c,shared.czero); return; } } else { - incrRefCount(c->argv[1]); - incrRefCount(c->argv[2]); + incrRefCount(val); } server.dirty++; - removeExpire(c->db,c->argv[1]); + removeExpire(c->db,key); + if (expire) setExpire(c->db,key,time(NULL)+seconds); addReply(c, nx ? shared.cone : shared.ok); } static void setCommand(redisClient *c) { - setGenericCommand(c,0); + setGenericCommand(c,0,c->argv[1],c->argv[2],NULL); } static void setnxCommand(redisClient *c) { - setGenericCommand(c,1); + setGenericCommand(c,1,c->argv[1],c->argv[2],NULL); +} + +static void setexCommand(redisClient *c) { + setGenericCommand(c,0,c->argv[1],c->argv[3],c->argv[2]); } static int getGenericCommand(redisClient *c) { robj *o; - + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL) return REDIS_OK; @@ -3816,11 +4523,7 @@ static void getCommand(redisClient *c) { static void getsetCommand(redisClient *c) { if (getGenericCommand(c) == REDIS_ERR) return; - if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) { - dictReplace(c->db->dict,c->argv[1],c->argv[2]); - } else { - incrRefCount(c->argv[1]); - } + dbReplace(c->db,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); server.dirty++; removeExpire(c->db,c->argv[1]); @@ -3828,7 +4531,7 @@ static void getsetCommand(redisClient *c) { static void mgetCommand(redisClient *c) { int j; - + addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1)); for (j = 1; j < c->argc; j++) { robj *o = lookupKeyRead(c->db,c->argv[j]); @@ -3866,17 +4569,9 @@ static void msetGenericCommand(redisClient *c, int nx) { } for (j = 1; j < c->argc; j += 2) { - int retval; - - tryObjectEncoding(c->argv[j+1]); - retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]); - if (retval == DICT_ERR) { - dictReplace(c->db->dict,c->argv[j],c->argv[j+1]); - incrRefCount(c->argv[j+1]); - } else { - incrRefCount(c->argv[j]); - incrRefCount(c->argv[j+1]); - } + c->argv[j+1] = tryObjectEncoding(c->argv[j+1]); + dbReplace(c->db,c->argv[j],c->argv[j+1]); + incrRefCount(c->argv[j+1]); removeExpire(c->db,c->argv[j]); } server.dirty += (c->argc-1)/2; @@ -3893,37 +4588,15 @@ static void msetnxCommand(redisClient *c) { static void incrDecrCommand(redisClient *c, long long incr) { long long value; - int retval; robj *o; - - o = lookupKeyWrite(c->db,c->argv[1]); - if (o == NULL) { - value = 0; - } else { - if (o->type != REDIS_STRING) { - value = 0; - } else { - char *eptr; - if (o->encoding == REDIS_ENCODING_RAW) - value = strtoll(o->ptr, &eptr, 10); - else if (o->encoding == REDIS_ENCODING_INT) - value = (long)o->ptr; - else - redisAssert(1 != 1); - } - } + o = lookupKeyWrite(c->db,c->argv[1]); + if (o != NULL && checkType(c,o,REDIS_STRING)) return; + if (getLongLongFromObjectOrReply(c,o,&value,NULL) != REDIS_OK) return; value += incr; - o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value)); - tryObjectEncoding(o); - retval = dictAdd(c->db->dict,c->argv[1],o); - if (retval == DICT_ERR) { - dictReplace(c->db->dict,c->argv[1],o); - removeExpire(c->db,c->argv[1]); - } else { - incrRefCount(c->argv[1]); - } + o = createStringObjectFromLongLong(value); + dbReplace(c->db,c->argv[1],o); server.dirty++; addReply(c,shared.colon); addReply(c,o); @@ -3939,12 +4612,16 @@ static void decrCommand(redisClient *c) { } static void incrbyCommand(redisClient *c) { - long long incr = strtoll(c->argv[2]->ptr, NULL, 10); + long long incr; + + if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != REDIS_OK) return; incrDecrCommand(c,incr); } static void decrbyCommand(redisClient *c) { - long long incr = strtoll(c->argv[2]->ptr, NULL, 10); + long long incr; + + if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != REDIS_OK) return; incrDecrCommand(c,-incr); } @@ -3956,17 +4633,10 @@ static void appendCommand(redisClient *c) { o = lookupKeyWrite(c->db,c->argv[1]); if (o == NULL) { /* Create the key */ - retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); - incrRefCount(c->argv[1]); + retval = dbAdd(c->db,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); totlen = stringObjectLen(c->argv[2]); } else { - dictEntry *de; - - de = dictFind(c->db->dict,c->argv[1]); - assert(de != NULL); - - o = dictGetEntryVal(de); if (o->type != REDIS_STRING) { addReply(c,shared.wrongtypeerr); return; @@ -3978,7 +4648,7 @@ static void appendCommand(redisClient *c) { o = createStringObject(decoded->ptr, sdslen(decoded->ptr)); decrRefCount(decoded); - dictReplace(c->db->dict,c->argv[1],o); + dbReplace(c->db,c->argv[1],o); } /* APPEND! */ if (c->argv[2]->encoding == REDIS_ENCODING_RAW) { @@ -4037,21 +4707,27 @@ static void delCommand(redisClient *c) { int deleted = 0, j; for (j = 1; j < c->argc; j++) { - if (deleteKey(c->db,c->argv[j])) { + if (dbDelete(c->db,c->argv[j])) { + touchWatchedKey(c->db,c->argv[j]); server.dirty++; deleted++; } } - addReplyLong(c,deleted); + addReplyLongLong(c,deleted); } static void existsCommand(redisClient *c) { - addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero); + expireIfNeeded(c->db,c->argv[1]); + if (dbExists(c->db,c->argv[1])) { + addReply(c, shared.cone); + } else { + addReply(c, shared.czero); + } } static void selectCommand(redisClient *c) { int id = atoi(c->argv[1]->ptr); - + if (selectDb(c,id) == REDIS_ERR) { addReplySds(c,sdsnew("-ERR invalid DB index\r\n")); } else { @@ -4060,20 +4736,15 @@ static void selectCommand(redisClient *c) { } static void randomkeyCommand(redisClient *c) { - dictEntry *de; - - while(1) { - de = dictGetRandomKey(c->db->dict); - if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break; - } - if (de == NULL) { - addReply(c,shared.plus); - addReply(c,shared.crlf); - } else { - addReply(c,shared.plus); - addReply(c,dictGetEntryKey(de)); - addReply(c,shared.crlf); + robj *key; + + if ((key = dbRandomKey(c->db)) == NULL) { + addReply(c,shared.nullbulk); + return; } + + addReplyBulk(c,key); + decrRefCount(key); } static void keysCommand(redisClient *c) { @@ -4088,15 +4759,17 @@ static void keysCommand(redisClient *c) { addReply(c,lenobj); decrRefCount(lenobj); while((de = dictNext(di)) != NULL) { - robj *keyobj = dictGetEntryKey(de); + sds key = dictGetEntryKey(de); + robj *keyobj; - sds key = keyobj->ptr; if ((pattern[0] == '*' && pattern[1] == '\0') || stringmatchlen(pattern,plen,key,sdslen(key),0)) { + keyobj = createStringObject(key,sdslen(key)); if (expireIfNeeded(c->db,keyobj) == 0) { addReplyBulk(c,keyobj); numkeys++; } + decrRefCount(keyobj); } } dictReleaseIterator(di); @@ -4160,40 +4833,9 @@ static void bgsaveCommand(redisClient *c) { } static void shutdownCommand(redisClient *c) { - redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); - /* Kill the saving child if there is a background saving in progress. - We want to avoid race conditions, for instance our saving child may - overwrite the synchronous saving did by SHUTDOWN. */ - if (server.bgsavechildpid != -1) { - redisLog(REDIS_WARNING,"There is a live saving child. Killing it!"); - kill(server.bgsavechildpid,SIGKILL); - rdbRemoveTempFile(server.bgsavechildpid); - } - if (server.appendonly) { - /* Append only file: fsync() the AOF and exit */ - fsync(server.appendfd); - if (server.vm_enabled) unlink(server.vm_swap_file); + if (prepareForShutdown() == REDIS_OK) exit(0); - } else { - /* Snapshotting. Perform a SYNC SAVE and exit */ - if (rdbSave(server.dbfilename) == REDIS_OK) { - if (server.daemonize) - unlink(server.pidfile); - redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); - redisLog(REDIS_WARNING,"Server exit now, bye bye..."); - if (server.vm_enabled) unlink(server.vm_swap_file); - exit(0); - } else { - /* Ooops.. error saving! The best we can do is to continue - * operating. Note that if there was a background saving process, - * in the next cron() Redis will be notified that the background - * saving aborted, handling special stuff like slaves pending for - * synchronization... */ - redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit"); - addReplySds(c, - sdsnew("-ERR can't quit, problems saving the DB\r\n")); - } - } + addReplySds(c, sdsnew("-ERR Errors trying to SHUTDOWN. Check logs.\r\n")); } static void renameGenericCommand(redisClient *c, int nx) { @@ -4210,17 +4852,16 @@ static void renameGenericCommand(redisClient *c, int nx) { incrRefCount(o); deleteIfVolatile(c->db,c->argv[2]); - if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) { + if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) { if (nx) { decrRefCount(o); addReply(c,shared.czero); return; } - dictReplace(c->db->dict,c->argv[2],o); - } else { - incrRefCount(c->argv[2]); + dbReplace(c->db,c->argv[2],o); } - deleteKey(c->db,c->argv[1]); + dbDelete(c->db,c->argv[1]); + touchWatchedKey(c->db,c->argv[2]); server.dirty++; addReply(c,nx ? shared.cone : shared.ok); } @@ -4264,170 +4905,500 @@ static void moveCommand(redisClient *c) { /* Try to add the element to the target DB */ deleteIfVolatile(dst,c->argv[1]); - if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) { + if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) { addReply(c,shared.czero); return; } - incrRefCount(c->argv[1]); incrRefCount(o); /* OK! key moved, free the entry in the source DB */ - deleteKey(src,c->argv[1]); + dbDelete(src,c->argv[1]); server.dirty++; addReply(c,shared.cone); } /* =================================== Lists ================================ */ -static void pushGenericCommand(redisClient *c, int where) { - robj *lobj; - list *list; - lobj = lookupKeyWrite(c->db,c->argv[1]); - if (lobj == NULL) { - if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) { - addReply(c,shared.cone); - return; - } - lobj = createListObject(); - list = lobj->ptr; + +/* Check the argument length to see if it requires us to convert the ziplist + * to a real list. Only check raw-encoded objects because integer encoded + * objects are never too long. */ +static void listTypeTryConversion(robj *subject, robj *value) { + if (subject->encoding != REDIS_ENCODING_ZIPLIST) return; + if (value->encoding == REDIS_ENCODING_RAW && + sdslen(value->ptr) > server.list_max_ziplist_value) + listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST); +} + +static void listTypePush(robj *subject, robj *value, int where) { + /* Check if we need to convert the ziplist */ + listTypeTryConversion(subject,value); + if (subject->encoding == REDIS_ENCODING_ZIPLIST && + ziplistLen(subject->ptr) >= server.list_max_ziplist_entries) + listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST); + + if (subject->encoding == REDIS_ENCODING_ZIPLIST) { + int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL; + value = getDecodedObject(value); + subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos); + decrRefCount(value); + } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) { if (where == REDIS_HEAD) { - listAddNodeHead(list,c->argv[2]); + listAddNodeHead(subject->ptr,value); } else { - listAddNodeTail(list,c->argv[2]); + listAddNodeTail(subject->ptr,value); } - dictAdd(c->db->dict,c->argv[1],lobj); - incrRefCount(c->argv[1]); - incrRefCount(c->argv[2]); + incrRefCount(value); } else { - if (lobj->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerr); - return; - } - if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) { - addReply(c,shared.cone); - return; + redisPanic("Unknown list encoding"); + } +} + +static robj *listTypePop(robj *subject, int where) { + robj *value = NULL; + if (subject->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + int pos = (where == REDIS_HEAD) ? 0 : -1; + p = ziplistIndex(subject->ptr,pos); + if (ziplistGet(p,&vstr,&vlen,&vlong)) { + if (vstr) { + value = createStringObject((char*)vstr,vlen); + } else { + value = createStringObjectFromLongLong(vlong); + } + /* We only need to delete an element when it exists */ + subject->ptr = ziplistDelete(subject->ptr,&p); } - list = lobj->ptr; + } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) { + list *list = subject->ptr; + listNode *ln; if (where == REDIS_HEAD) { - listAddNodeHead(list,c->argv[2]); + ln = listFirst(list); } else { - listAddNodeTail(list,c->argv[2]); + ln = listLast(list); } - incrRefCount(c->argv[2]); + if (ln != NULL) { + value = listNodeValue(ln); + incrRefCount(value); + listDelNode(list,ln); + } + } else { + redisPanic("Unknown list encoding"); } - server.dirty++; - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(list))); + return value; } -static void lpushCommand(redisClient *c) { - pushGenericCommand(c,REDIS_HEAD); +static unsigned long listTypeLength(robj *subject) { + if (subject->encoding == REDIS_ENCODING_ZIPLIST) { + return ziplistLen(subject->ptr); + } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) { + return listLength((list*)subject->ptr); + } else { + redisPanic("Unknown list encoding"); + } } -static void rpushCommand(redisClient *c) { - pushGenericCommand(c,REDIS_TAIL); -} +/* Structure to hold set iteration abstraction. */ +typedef struct { + robj *subject; + unsigned char encoding; + unsigned char direction; /* Iteration direction */ + unsigned char *zi; + listNode *ln; +} listTypeIterator; -static void llenCommand(redisClient *c) { - robj *o; - list *l; +/* Structure for an entry while iterating over a list. */ +typedef struct { + listTypeIterator *li; + unsigned char *zi; /* Entry in ziplist */ + listNode *ln; /* Entry in linked list */ +} listTypeEntry; + +/* Initialize an iterator at the specified index. */ +static listTypeIterator *listTypeInitIterator(robj *subject, int index, unsigned char direction) { + listTypeIterator *li = zmalloc(sizeof(listTypeIterator)); + li->subject = subject; + li->encoding = subject->encoding; + li->direction = direction; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + li->zi = ziplistIndex(subject->ptr,index); + } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) { + li->ln = listIndex(subject->ptr,index); + } else { + redisPanic("Unknown list encoding"); + } + return li; +} - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,o,REDIS_LIST)) return; - - l = o->ptr; - addReplyUlong(c,listLength(l)); +/* Clean up the iterator. */ +static void listTypeReleaseIterator(listTypeIterator *li) { + zfree(li); } -static void lindexCommand(redisClient *c) { - robj *o; - int index = atoi(c->argv[2]->ptr); - list *list; - listNode *ln; +/* Stores pointer to current the entry in the provided entry structure + * and advances the position of the iterator. Returns 1 when the current + * entry is in fact an entry, 0 otherwise. */ +static int listTypeNext(listTypeIterator *li, listTypeEntry *entry) { + /* Protect from converting when iterating */ + redisAssert(li->subject->encoding == li->encoding); - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; + entry->li = li; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + entry->zi = li->zi; + if (entry->zi != NULL) { + if (li->direction == REDIS_TAIL) + li->zi = ziplistNext(li->subject->ptr,li->zi); + else + li->zi = ziplistPrev(li->subject->ptr,li->zi); + return 1; + } + } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) { + entry->ln = li->ln; + if (entry->ln != NULL) { + if (li->direction == REDIS_TAIL) + li->ln = li->ln->next; + else + li->ln = li->ln->prev; + return 1; + } + } else { + redisPanic("Unknown list encoding"); + } + return 0; +} - ln = listIndex(list, index); - if (ln == NULL) { - addReply(c,shared.nullbulk); +/* Return entry or NULL at the current position of the iterator. */ +static robj *listTypeGet(listTypeEntry *entry) { + listTypeIterator *li = entry->li; + robj *value = NULL; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *vstr; + unsigned int vlen; + long long vlong; + redisAssert(entry->zi != NULL); + if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) { + if (vstr) { + value = createStringObject((char*)vstr,vlen); + } else { + value = createStringObjectFromLongLong(vlong); + } + } + } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) { + redisAssert(entry->ln != NULL); + value = listNodeValue(entry->ln); + incrRefCount(value); } else { - robj *ele = listNodeValue(ln); - addReplyBulk(c,ele); + redisPanic("Unknown list encoding"); } + return value; } -static void lsetCommand(redisClient *c) { - robj *o; - int index = atoi(c->argv[2]->ptr); - list *list; - listNode *ln; +static void listTypeInsert(listTypeEntry *entry, robj *value, int where) { + robj *subject = entry->li->subject; + if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) { + value = getDecodedObject(value); + if (where == REDIS_TAIL) { + unsigned char *next = ziplistNext(subject->ptr,entry->zi); - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; + /* When we insert after the current element, but the current element + * is the tail of the list, we need to do a push. */ + if (next == NULL) { + subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL); + } else { + subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr)); + } + } else { + subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr)); + } + decrRefCount(value); + } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) { + if (where == REDIS_TAIL) { + listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL); + } else { + listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD); + } + incrRefCount(value); + } else { + redisPanic("Unknown list encoding"); + } +} - ln = listIndex(list, index); - if (ln == NULL) { - addReply(c,shared.outofrangeerr); +/* Compare the given object with the entry at the current position. */ +static int listTypeEqual(listTypeEntry *entry, robj *o) { + listTypeIterator *li = entry->li; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + redisAssert(o->encoding == REDIS_ENCODING_RAW); + return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr)); + } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) { + return equalStringObjects(o,listNodeValue(entry->ln)); } else { - robj *ele = listNodeValue(ln); + redisPanic("Unknown list encoding"); + } +} - decrRefCount(ele); - listNodeValue(ln) = c->argv[3]; - incrRefCount(c->argv[3]); - addReply(c,shared.ok); - server.dirty++; +/* Delete the element pointed to. */ +static void listTypeDelete(listTypeEntry *entry) { + listTypeIterator *li = entry->li; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p = entry->zi; + li->subject->ptr = ziplistDelete(li->subject->ptr,&p); + + /* Update position of the iterator depending on the direction */ + if (li->direction == REDIS_TAIL) + li->zi = p; + else + li->zi = ziplistPrev(li->subject->ptr,p); + } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) { + listNode *next; + if (li->direction == REDIS_TAIL) + next = entry->ln->next; + else + next = entry->ln->prev; + listDelNode(li->subject->ptr,entry->ln); + li->ln = next; + } else { + redisPanic("Unknown list encoding"); } } -static void popGenericCommand(redisClient *c, int where) { - robj *o; - list *list; - listNode *ln; +static void listTypeConvert(robj *subject, int enc) { + listTypeIterator *li; + listTypeEntry entry; + redisAssert(subject->type == REDIS_LIST); - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; + if (enc == REDIS_ENCODING_LINKEDLIST) { + list *l = listCreate(); + listSetFreeMethod(l,decrRefCount); - if (where == REDIS_HEAD) - ln = listFirst(list); - else - ln = listLast(list); + /* listTypeGet returns a robj with incremented refcount */ + li = listTypeInitIterator(subject,0,REDIS_TAIL); + while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry)); + listTypeReleaseIterator(li); - if (ln == NULL) { - addReply(c,shared.nullbulk); + subject->encoding = REDIS_ENCODING_LINKEDLIST; + zfree(subject->ptr); + subject->ptr = l; } else { - robj *ele = listNodeValue(ln); - addReplyBulk(c,ele); - listDelNode(list,ln); - if (listLength(list) == 0) deleteKey(c->db,c->argv[1]); - server.dirty++; + redisPanic("Unsupported list conversion"); } } -static void lpopCommand(redisClient *c) { - popGenericCommand(c,REDIS_HEAD); +static void pushGenericCommand(redisClient *c, int where) { + robj *lobj = lookupKeyWrite(c->db,c->argv[1]); + if (lobj == NULL) { + if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) { + addReply(c,shared.cone); + return; + } + lobj = createZiplistObject(); + dbAdd(c->db,c->argv[1],lobj); + } else { + if (lobj->type != REDIS_LIST) { + addReply(c,shared.wrongtypeerr); + return; + } + if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) { + addReply(c,shared.cone); + return; + } + } + listTypePush(lobj,c->argv[2],where); + addReplyLongLong(c,listTypeLength(lobj)); + server.dirty++; } -static void rpopCommand(redisClient *c) { - popGenericCommand(c,REDIS_TAIL); +static void lpushCommand(redisClient *c) { + pushGenericCommand(c,REDIS_HEAD); } -static void lrangeCommand(redisClient *c) { - robj *o; - int start = atoi(c->argv[2]->ptr); - int end = atoi(c->argv[3]->ptr); - int llen; - int rangelen, j; - list *list; - listNode *ln; - robj *ele; +static void rpushCommand(redisClient *c) { + pushGenericCommand(c,REDIS_TAIL); +} - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullmultibulk)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; - llen = listLength(list); +static void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) { + robj *subject; + listTypeIterator *iter; + listTypeEntry entry; + int inserted = 0; + + if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + checkType(c,subject,REDIS_LIST)) return; + + if (refval != NULL) { + /* Note: we expect refval to be string-encoded because it is *not* the + * last argument of the multi-bulk LINSERT. */ + redisAssert(refval->encoding == REDIS_ENCODING_RAW); + + /* We're not sure if this value can be inserted yet, but we cannot + * convert the list inside the iterator. We don't want to loop over + * the list twice (once to see if the value can be inserted and once + * to do the actual insert), so we assume this value can be inserted + * and convert the ziplist to a regular list if necessary. */ + listTypeTryConversion(subject,val); + + /* Seek refval from head to tail */ + iter = listTypeInitIterator(subject,0,REDIS_TAIL); + while (listTypeNext(iter,&entry)) { + if (listTypeEqual(&entry,refval)) { + listTypeInsert(&entry,val,where); + inserted = 1; + break; + } + } + listTypeReleaseIterator(iter); + + if (inserted) { + /* Check if the length exceeds the ziplist length threshold. */ + if (subject->encoding == REDIS_ENCODING_ZIPLIST && + ziplistLen(subject->ptr) > server.list_max_ziplist_entries) + listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST); + server.dirty++; + } else { + /* Notify client of a failed insert */ + addReply(c,shared.cnegone); + return; + } + } else { + listTypePush(subject,val,where); + server.dirty++; + } + + addReplyUlong(c,listTypeLength(subject)); +} + +static void lpushxCommand(redisClient *c) { + pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD); +} + +static void rpushxCommand(redisClient *c) { + pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL); +} + +static void linsertCommand(redisClient *c) { + if (strcasecmp(c->argv[2]->ptr,"after") == 0) { + pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL); + } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) { + pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD); + } else { + addReply(c,shared.syntaxerr); + } +} + +static void llenCommand(redisClient *c) { + robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; + addReplyUlong(c,listTypeLength(o)); +} + +static void lindexCommand(redisClient *c) { + robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; + int index = atoi(c->argv[2]->ptr); + robj *value = NULL; + + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + p = ziplistIndex(o->ptr,index); + if (ziplistGet(p,&vstr,&vlen,&vlong)) { + if (vstr) { + value = createStringObject((char*)vstr,vlen); + } else { + value = createStringObjectFromLongLong(vlong); + } + addReplyBulk(c,value); + decrRefCount(value); + } else { + addReply(c,shared.nullbulk); + } + } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { + listNode *ln = listIndex(o->ptr,index); + if (ln != NULL) { + value = listNodeValue(ln); + addReplyBulk(c,value); + } else { + addReply(c,shared.nullbulk); + } + } else { + redisPanic("Unknown list encoding"); + } +} + +static void lsetCommand(redisClient *c) { + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; + int index = atoi(c->argv[2]->ptr); + robj *value = c->argv[3]; + + listTypeTryConversion(o,value); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p, *zl = o->ptr; + p = ziplistIndex(zl,index); + if (p == NULL) { + addReply(c,shared.outofrangeerr); + } else { + o->ptr = ziplistDelete(o->ptr,&p); + value = getDecodedObject(value); + o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr)); + decrRefCount(value); + addReply(c,shared.ok); + server.dirty++; + } + } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { + listNode *ln = listIndex(o->ptr,index); + if (ln == NULL) { + addReply(c,shared.outofrangeerr); + } else { + decrRefCount((robj*)listNodeValue(ln)); + listNodeValue(ln) = value; + incrRefCount(value); + addReply(c,shared.ok); + server.dirty++; + } + } else { + redisPanic("Unknown list encoding"); + } +} + +static void popGenericCommand(redisClient *c, int where) { + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; + + robj *value = listTypePop(o,where); + if (value == NULL) { + addReply(c,shared.nullbulk); + } else { + addReplyBulk(c,value); + decrRefCount(value); + if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); + server.dirty++; + } +} + +static void lpopCommand(redisClient *c) { + popGenericCommand(c,REDIS_HEAD); +} + +static void rpopCommand(redisClient *c) { + popGenericCommand(c,REDIS_TAIL); +} + +static void lrangeCommand(redisClient *c) { + robj *o, *value; + int start = atoi(c->argv[2]->ptr); + int end = atoi(c->argv[3]->ptr); + int llen; + int rangelen, j; + listTypeEntry entry; + + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL + || checkType(c,o,REDIS_LIST)) return; + llen = listTypeLength(o); /* convert negative indexes */ if (start < 0) start = llen+start; @@ -4445,13 +5416,15 @@ static void lrangeCommand(redisClient *c) { rangelen = (end-start)+1; /* Return the result in form of a multi-bulk reply */ - ln = listIndex(list, start); addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen)); + listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL); for (j = 0; j < rangelen; j++) { - ele = listNodeValue(ln); - addReplyBulk(c,ele); - ln = ln->next; + redisAssert(listTypeNext(li,&entry)); + value = listTypeGet(&entry); + addReplyBulk(c,value); + decrRefCount(value); } + listTypeReleaseIterator(li); } static void ltrimCommand(redisClient *c) { @@ -4465,8 +5438,7 @@ static void ltrimCommand(redisClient *c) { if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL || checkType(c,o,REDIS_LIST)) return; - list = o->ptr; - llen = listLength(list); + llen = listTypeLength(o); /* convert negative indexes */ if (start < 0) start = llen+start; @@ -4486,49 +5458,63 @@ static void ltrimCommand(redisClient *c) { } /* Remove list elements to perform the trim */ - for (j = 0; j < ltrim; j++) { - ln = listFirst(list); - listDelNode(list,ln); - } - for (j = 0; j < rtrim; j++) { - ln = listLast(list); - listDelNode(list,ln); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + o->ptr = ziplistDeleteRange(o->ptr,0,ltrim); + o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim); + } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { + list = o->ptr; + for (j = 0; j < ltrim; j++) { + ln = listFirst(list); + listDelNode(list,ln); + } + for (j = 0; j < rtrim; j++) { + ln = listLast(list); + listDelNode(list,ln); + } + } else { + redisPanic("Unknown list encoding"); } - if (listLength(list) == 0) deleteKey(c->db,c->argv[1]); + if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; addReply(c,shared.ok); } static void lremCommand(redisClient *c) { - robj *o; - list *list; - listNode *ln, *next; + robj *subject, *obj = c->argv[3]; int toremove = atoi(c->argv[2]->ptr); int removed = 0; - int fromtail = 0; + listTypeEntry entry; - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; + subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero); + if (subject == NULL || checkType(c,subject,REDIS_LIST)) return; + /* Make sure obj is raw when we're dealing with a ziplist */ + if (subject->encoding == REDIS_ENCODING_ZIPLIST) + obj = getDecodedObject(obj); + + listTypeIterator *li; if (toremove < 0) { toremove = -toremove; - fromtail = 1; + li = listTypeInitIterator(subject,-1,REDIS_HEAD); + } else { + li = listTypeInitIterator(subject,0,REDIS_TAIL); } - ln = fromtail ? list->tail : list->head; - while (ln) { - robj *ele = listNodeValue(ln); - next = fromtail ? ln->prev : ln->next; - if (compareStringObjects(ele,c->argv[3]) == 0) { - listDelNode(list,ln); + while (listTypeNext(li,&entry)) { + if (listTypeEqual(&entry,obj)) { + listTypeDelete(&entry); server.dirty++; removed++; if (toremove && removed == toremove) break; } - ln = next; } - if (listLength(list) == 0) deleteKey(c->db,c->argv[1]); + listTypeReleaseIterator(li); + + /* Clean up raw encoded object */ + if (subject->encoding == REDIS_ENCODING_ZIPLIST) + decrRefCount(obj); + + if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]); addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed)); } @@ -4548,47 +5534,36 @@ static void lremCommand(redisClient *c) { * as well. This command was originally proposed by Ezra Zygmuntowicz. */ static void rpoplpushcommand(redisClient *c) { - robj *sobj; - list *srclist; - listNode *ln; - + robj *sobj, *value; if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,sobj,REDIS_LIST)) return; - srclist = sobj->ptr; - ln = listLast(srclist); - if (ln == NULL) { + if (listTypeLength(sobj) == 0) { addReply(c,shared.nullbulk); } else { robj *dobj = lookupKeyWrite(c->db,c->argv[2]); - robj *ele = listNodeValue(ln); - list *dstlist; - - if (dobj && dobj->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerr); - return; - } + if (dobj && checkType(c,dobj,REDIS_LIST)) return; + value = listTypePop(sobj,REDIS_TAIL); /* Add the element to the target list (unless it's directly * passed to some BLPOP-ing client */ - if (!handleClientsWaitingListPush(c,c->argv[2],ele)) { - if (dobj == NULL) { - /* Create the list if the key does not exist */ - dobj = createListObject(); - dictAdd(c->db->dict,c->argv[2],dobj); - incrRefCount(c->argv[2]); + if (!handleClientsWaitingListPush(c,c->argv[2],value)) { + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(c->db,c->argv[2],dobj); } - dstlist = dobj->ptr; - listAddNodeHead(dstlist,ele); - incrRefCount(ele); + listTypePush(dobj,value,REDIS_HEAD); } /* Send the element to the client as reply as well */ - addReplyBulk(c,ele); + addReplyBulk(c,value); + + /* listTypePop returns an object with its refcount incremented */ + decrRefCount(value); - /* Finally remove the element from the source list */ - listDelNode(srclist,ln); - if (listLength(srclist) == 0) deleteKey(c->db,c->argv[1]); + /* Delete the source list when it is empty */ + if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; } } @@ -4601,8 +5576,7 @@ static void saddCommand(redisClient *c) { set = lookupKeyWrite(c->db,c->argv[1]); if (set == NULL) { set = createSetObject(); - dictAdd(c->db->dict,c->argv[1],set); - incrRefCount(c->argv[1]); + dbAdd(c->db,c->argv[1],set); } else { if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); @@ -4627,7 +5601,7 @@ static void sremCommand(redisClient *c) { if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) { server.dirty++; if (htNeedsResize(set->ptr)) dictResize(set->ptr); - if (dictSize((dict*)set->ptr) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]); addReply(c,shared.cone); } else { addReply(c,shared.czero); @@ -4658,13 +5632,12 @@ static void smoveCommand(redisClient *c) { return; } if (dictSize((dict*)srcset->ptr) == 0 && srcset != dstset) - deleteKey(c->db,c->argv[1]); + dbDelete(c->db,c->argv[1]); server.dirty++; /* Add the element to the destination set */ if (!dstset) { dstset = createSetObject(); - dictAdd(c->db->dict,c->argv[2],dstset); - incrRefCount(c->argv[2]); + dbAdd(c->db,c->argv[2],dstset); } if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK) incrRefCount(c->argv[3]); @@ -4689,7 +5662,7 @@ static void scardCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_SET)) return; - + s = o->ptr; addReplyUlong(c,dictSize(s)); } @@ -4710,7 +5683,7 @@ static void spopCommand(redisClient *c) { addReplyBulk(c,ele); dictDelete(set->ptr,ele); if (htNeedsResize(set->ptr)) dictResize(set->ptr); - if (dictSize((dict*)set->ptr) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; } } @@ -4754,11 +5727,11 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long if (!setobj) { zfree(dv); if (dstkey) { - if (deleteKey(c->db,dstkey)) + if (dbDelete(c->db,dstkey)) server.dirty++; addReply(c,shared.czero); } else { - addReply(c,shared.nullmultibulk); + addReply(c,shared.emptymultibulk); } return; } @@ -4814,11 +5787,10 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long if (dstkey) { /* Store the resulting set into the target, if the intersection * is not an empty set. */ - deleteKey(c->db,dstkey); + dbDelete(c->db,dstkey); if (dictSize((dict*)dstset->ptr) > 0) { - dictAdd(c->db->dict,dstkey,dstset); - incrRefCount(dstkey); - addReplyLong(c,dictSize((dict*)dstset->ptr)); + dbAdd(c->db,dstkey,dstset); + addReplyLongLong(c,dictSize((dict*)dstset->ptr)); } else { decrRefCount(dstset); addReply(c,shared.czero); @@ -4917,11 +5889,10 @@ static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnu } else { /* If we have a target key where to store the resulting set * create this key with the result set inside */ - deleteKey(c->db,dstkey); + dbDelete(c->db,dstkey); if (dictSize((dict*)dstset->ptr) > 0) { - dictAdd(c->db->dict,dstkey,dstset); - incrRefCount(dstkey); - addReplyLong(c,dictSize((dict*)dstset->ptr)); + dbAdd(c->db,dstkey,dstset); + addReplyLongLong(c,dictSize((dict*)dstset->ptr)); } else { decrRefCount(dstset); addReply(c,shared.czero); @@ -4970,8 +5941,10 @@ static zskiplistNode *zslCreateNode(int level, double score, robj *obj) { zskiplistNode *zn = zmalloc(sizeof(*zn)); zn->forward = zmalloc(sizeof(zskiplistNode*) * level); - if (level > 0) + if (level > 1) zn->span = zmalloc(sizeof(unsigned int) * (level - 1)); + else + zn->span = NULL; zn->score = score; zn->obj = obj; return zn; @@ -4980,7 +5953,7 @@ static zskiplistNode *zslCreateNode(int level, double score, robj *obj) { static zskiplist *zslCreate(void) { int j; zskiplist *zsl; - + zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; @@ -5022,7 +5995,7 @@ static int zslRandomLevel(void) { int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) level += 1; - return level; + return (levelforward[0]; - if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) { + if (x && score == x->score && equalStringObjects(x->obj,obj)) { zslDeleteNode(zsl, x, update); zslFreeNode(x); return 1; @@ -5213,7 +6186,7 @@ static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) { * Returns 0 when the element cannot be found, rank otherwise. * Note that the rank is 1-based due to the span of zsl->header to the * first element. */ -static unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { +static unsigned long zslistTypeGetRank(zskiplist *zsl, double score, robj *o) { zskiplistNode *x; unsigned long rank = 0; int i; @@ -5229,7 +6202,7 @@ static unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { } /* x might be equal to zsl->header, so test if obj is non-NULL */ - if (x->obj && compareStringObjects(x->obj,o) == 0) { + if (x->obj && equalStringObjects(x->obj,o)) { return rank; } } @@ -5237,7 +6210,7 @@ static unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { } /* Finds an element by its rank. The rank argument needs to be 1-based. */ -zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { +zskiplistNode* zslistTypeGetElementByRank(zskiplist *zsl, unsigned long rank) { zskiplistNode *x; unsigned long traversed = 0; int i; @@ -5266,11 +6239,15 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor zset *zs; double *score; + if (isnan(scoreval)) { + addReplySds(c,sdsnew("-ERR provide score is Not A Number (nan)\r\n")); + return; + } + zsetobj = lookupKeyWrite(c->db,key); if (zsetobj == NULL) { zsetobj = createZsetObject(); - dictAdd(c->db->dict,key,zsetobj); - incrRefCount(key); + dbAdd(c->db,key,zsetobj); } else { if (zsetobj->type != REDIS_ZSET) { addReply(c,shared.wrongtypeerr); @@ -5294,6 +6271,15 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor } else { *score = scoreval; } + if (isnan(*score)) { + addReplySds(c, + sdsnew("-ERR resulting score is Not A Number (nan)\r\n")); + zfree(score); + /* Note that we don't need to check if the zset may be empty and + * should be removed here, as we can only obtain Nan as score if + * there was already an element in the sorted set. */ + return; + } } else { *score = scoreval; } @@ -5313,7 +6299,7 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor } else { dictEntry *de; double *oldscore; - + /* case 2: Score update operation */ de = dictFind(zs->dict,ele); redisAssert(de != NULL); @@ -5342,14 +6328,14 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor static void zaddCommand(redisClient *c) { double scoreval; - scoreval = strtod(c->argv[2]->ptr,NULL); + if (getDoubleFromObjectOrReply(c, c->argv[2], &scoreval, NULL) != REDIS_OK) return; zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0); } static void zincrbyCommand(redisClient *c) { double scoreval; - scoreval = strtod(c->argv[2]->ptr,NULL); + if (getDoubleFromObjectOrReply(c, c->argv[2], &scoreval, NULL) != REDIS_OK) return; zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1); } @@ -5377,37 +6363,43 @@ static void zremCommand(redisClient *c) { /* Delete from the hash table */ dictDelete(zs->dict,c->argv[2]); if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; addReply(c,shared.cone); } static void zremrangebyscoreCommand(redisClient *c) { - double min = strtod(c->argv[2]->ptr,NULL); - double max = strtod(c->argv[3]->ptr,NULL); + double min; + double max; long deleted; robj *zsetobj; zset *zs; + if ((getDoubleFromObjectOrReply(c, c->argv[2], &min, NULL) != REDIS_OK) || + (getDoubleFromObjectOrReply(c, c->argv[3], &max, NULL) != REDIS_OK)) return; + if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,zsetobj,REDIS_ZSET)) return; zs = zsetobj->ptr; deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); server.dirty += deleted; - addReplyLong(c,deleted); + addReplyLongLong(c,deleted); } static void zremrangebyrankCommand(redisClient *c) { - int start = atoi(c->argv[2]->ptr); - int end = atoi(c->argv[3]->ptr); + long start; + long end; int llen; long deleted; robj *zsetobj; zset *zs; + if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) || + (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return; + if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,zsetobj,REDIS_ZSET)) return; zs = zsetobj->ptr; @@ -5430,9 +6422,9 @@ static void zremrangebyrankCommand(redisClient *c) { * use 1-based rank */ deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); server.dirty += deleted; - addReplyLong(c, deleted); + addReplyLongLong(c, deleted); } typedef struct { @@ -5451,6 +6443,7 @@ static int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) { #define REDIS_AGGR_SUM 1 #define REDIS_AGGR_MIN 2 #define REDIS_AGGR_MAX 3 +#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e)) inline static void zunionInterAggregate(double *target, double val, int aggregate) { if (aggregate == REDIS_AGGR_SUM) { @@ -5461,12 +6454,12 @@ inline static void zunionInterAggregate(double *target, double val, int aggregat *target = val > *target ? val : *target; } else { /* safety net */ - redisAssert(0 != 0); + redisPanic("Unknown ZUNION/INTER aggregate type"); } } static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { - int i, j, zsetnum; + int i, j, setnum; int aggregate = REDIS_AGGR_SUM; zsetopsrc *src; robj *dstobj; @@ -5474,32 +6467,35 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dictIterator *di; dictEntry *de; - /* expect zsetnum input keys to be given */ - zsetnum = atoi(c->argv[2]->ptr); - if (zsetnum < 1) { - addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNION/ZINTER\r\n")); + /* expect setnum input keys to be given */ + setnum = atoi(c->argv[2]->ptr); + if (setnum < 1) { + addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n")); return; } /* test if the expected number of keys would overflow */ - if (3+zsetnum > c->argc) { + if (3+setnum > c->argc) { addReply(c,shared.syntaxerr); return; } /* read keys to be used for input */ - src = zmalloc(sizeof(zsetopsrc) * zsetnum); - for (i = 0, j = 3; i < zsetnum; i++, j++) { - robj *zsetobj = lookupKeyWrite(c->db,c->argv[j]); - if (!zsetobj) { + src = zmalloc(sizeof(zsetopsrc) * setnum); + for (i = 0, j = 3; i < setnum; i++, j++) { + robj *obj = lookupKeyWrite(c->db,c->argv[j]); + if (!obj) { src[i].dict = NULL; } else { - if (zsetobj->type != REDIS_ZSET) { + if (obj->type == REDIS_ZSET) { + src[i].dict = ((zset*)obj->ptr)->dict; + } else if (obj->type == REDIS_SET) { + src[i].dict = (obj->ptr); + } else { zfree(src); addReply(c,shared.wrongtypeerr); return; } - src[i].dict = ((zset*)zsetobj->ptr)->dict; } /* default all weights to 1 */ @@ -5511,10 +6507,11 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { int remaining = c->argc - j; while (remaining) { - if (remaining >= (zsetnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { + if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { j++; remaining--; - for (i = 0; i < zsetnum; i++, j++, remaining--) { - src[i].weight = strtod(c->argv[j]->ptr, NULL); + for (i = 0; i < setnum; i++, j++, remaining--) { + if (getDoubleFromObjectOrReply(c, c->argv[j], &src[i].weight, NULL) != REDIS_OK) + return; } } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) { j++; remaining--; @@ -5540,7 +6537,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { /* sort sets from the smallest to largest, this will improve our * algorithm's performance */ - qsort(src,zsetnum,sizeof(zsetopsrc), qsortCompareZsetopsrcByCardinality); + qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality); dstobj = createZsetObject(); dstzset = dstobj->ptr; @@ -5553,12 +6550,12 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { di = dictGetIterator(src[0].dict); while((de = dictNext(di)) != NULL) { double *score = zmalloc(sizeof(double)), value; - *score = src[0].weight * (*(double*)dictGetEntryVal(de)); + *score = src[0].weight * zunionInterDictValue(de); - for (j = 1; j < zsetnum; j++) { + for (j = 1; j < setnum; j++) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { - value = src[j].weight * (*(double*)dictGetEntryVal(other)); + value = src[j].weight * zunionInterDictValue(other); zunionInterAggregate(score, value, aggregate); } else { break; @@ -5566,7 +6563,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { } /* skip entry when not present in every source dict */ - if (j != zsetnum) { + if (j != setnum) { zfree(score); } else { robj *o = dictGetEntryKey(de); @@ -5579,7 +6576,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dictReleaseIterator(di); } } else if (op == REDIS_OP_UNION) { - for (i = 0; i < zsetnum; i++) { + for (i = 0; i < setnum; i++) { if (!src[i].dict) continue; di = dictGetIterator(src[i].dict); @@ -5588,14 +6585,14 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue; double *score = zmalloc(sizeof(double)), value; - *score = src[i].weight * (*(double*)dictGetEntryVal(de)); + *score = src[i].weight * zunionInterDictValue(de); /* because the zsets are sorted by size, its only possible * for sets at larger indices to hold this entry */ - for (j = (i+1); j < zsetnum; j++) { + for (j = (i+1); j < setnum; j++) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { - value = src[j].weight * (*(double*)dictGetEntryVal(other)); + value = src[j].weight * zunionInterDictValue(other); zunionInterAggregate(score, value, aggregate); } } @@ -5613,31 +6610,30 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION); } - deleteKey(c->db,dstkey); + dbDelete(c->db,dstkey); if (dstzset->zsl->length) { - dictAdd(c->db->dict,dstkey,dstobj); - incrRefCount(dstkey); - addReplyLong(c, dstzset->zsl->length); + dbAdd(c->db,dstkey,dstobj); + addReplyLongLong(c, dstzset->zsl->length); server.dirty++; } else { - decrRefCount(dstzset); + decrRefCount(dstobj); addReply(c, shared.czero); } zfree(src); } -static void zunionCommand(redisClient *c) { +static void zunionstoreCommand(redisClient *c) { zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION); } -static void zinterCommand(redisClient *c) { +static void zinterstoreCommand(redisClient *c) { zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER); } static void zrangeGenericCommand(redisClient *c, int reverse) { robj *o; - int start = atoi(c->argv[2]->ptr); - int end = atoi(c->argv[3]->ptr); + long start; + long end; int withscores = 0; int llen; int rangelen, j; @@ -5646,6 +6642,9 @@ static void zrangeGenericCommand(redisClient *c, int reverse) { zskiplistNode *ln; robj *ele; + if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) || + (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return; + if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) { withscores = 1; } else if (c->argc >= 5) { @@ -5653,8 +6652,8 @@ static void zrangeGenericCommand(redisClient *c, int reverse) { return; } - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullmultibulk)) == NULL || - checkType(c,o,REDIS_ZSET)) return; + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL + || checkType(c,o,REDIS_ZSET)) return; zsetobj = o->ptr; zsl = zsetobj->zsl; llen = zsl->length; @@ -5677,10 +6676,10 @@ static void zrangeGenericCommand(redisClient *c, int reverse) { /* check if starting point is trivial, before searching * the element in log(N) time */ if (reverse) { - ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start); + ln = start == 0 ? zsl->tail : zslistTypeGetElementByRank(zsl, llen-start); } else { ln = start == 0 ? - zsl->header->forward[0] : zslGetElementByRank(zsl, start+1); + zsl->header->forward[0] : zslistTypeGetElementByRank(zsl, start+1); } /* Return the result in form of a multi-bulk reply */ @@ -5760,7 +6759,7 @@ static void genericZrangebyscoreCommand(redisClient *c, int justcount) { /* Ok, lookup the key and get the range */ o = lookupKeyRead(c->db,c->argv[1]); if (o == NULL) { - addReply(c,justcount ? shared.czero : shared.nullmultibulk); + addReply(c,justcount ? shared.czero : shared.emptymultibulk); } else { if (o->type != REDIS_ZSET) { addReply(c,shared.wrongtypeerr); @@ -5810,7 +6809,7 @@ static void genericZrangebyscoreCommand(redisClient *c, int justcount) { if (limit > 0) limit--; } if (justcount) { - addReplyLong(c,(long)rangelen); + addReplyLongLong(c,(long)rangelen); } else { lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n", withscores ? (rangelen*2) : rangelen); @@ -5877,12 +6876,12 @@ static void zrankGenericCommand(redisClient *c, int reverse) { } score = dictGetEntryVal(de); - rank = zslGetRank(zsl, *score, c->argv[2]); + rank = zslistTypeGetRank(zsl, *score, c->argv[2]); if (rank) { if (reverse) { - addReplyLong(c, zsl->length - rank); + addReplyLongLong(c, zsl->length - rank); } else { - addReplyLong(c, rank-1); + addReplyLongLong(c, rank-1); } } else { addReply(c,shared.nullbulk); @@ -5897,225 +6896,388 @@ static void zrevrankCommand(redisClient *c) { zrankGenericCommand(c, 1); } -/* =================================== Hashes =============================== */ -static void hsetCommand(redisClient *c) { - int update = 0; - robj *o = lookupKeyWrite(c->db,c->argv[1]); +/* ========================= Hashes utility functions ======================= */ +#define REDIS_HASH_KEY 1 +#define REDIS_HASH_VALUE 2 - if (o == NULL) { - o = createHashObject(); - dictAdd(c->db->dict,c->argv[1],o); - incrRefCount(c->argv[1]); - } else { - if (o->type != REDIS_HASH) { - addReply(c,shared.wrongtypeerr); +/* Check the length of a number of objects to see if we need to convert a + * zipmap to a real hash. Note that we only check string encoded objects + * as their string length can be queried in constant time. */ +static void hashTypeTryConversion(robj *subject, robj **argv, int start, int end) { + int i; + if (subject->encoding != REDIS_ENCODING_ZIPMAP) return; + + for (i = start; i <= end; i++) { + if (argv[i]->encoding == REDIS_ENCODING_RAW && + sdslen(argv[i]->ptr) > server.hash_max_zipmap_value) + { + convertToRealHash(subject); return; } } - /* We want to convert the zipmap into an hash table right now if the - * entry to be added is too big. Note that we check if the object - * is integer encoded before to try fetching the length in the test below. - * This is because integers are small, but currently stringObjectLen() - * performs a slow conversion: not worth it. */ - if (o->encoding == REDIS_ENCODING_ZIPMAP && - ((c->argv[2]->encoding == REDIS_ENCODING_RAW && - sdslen(c->argv[2]->ptr) > server.hash_max_zipmap_value) || - (c->argv[3]->encoding == REDIS_ENCODING_RAW && - sdslen(c->argv[3]->ptr) > server.hash_max_zipmap_value))) - { - convertToRealHash(o); +} + +/* Encode given objects in-place when the hash uses a dict. */ +static void hashTypeTryObjectEncoding(robj *subject, robj **o1, robj **o2) { + if (subject->encoding == REDIS_ENCODING_HT) { + if (o1) *o1 = tryObjectEncoding(*o1); + if (o2) *o2 = tryObjectEncoding(*o2); } +} +/* Get the value from a hash identified by key. Returns either a string + * object or NULL if the value cannot be found. The refcount of the object + * is always increased by 1 when the value was found. */ +static robj *hashTypeGet(robj *o, robj *key) { + robj *value = NULL; if (o->encoding == REDIS_ENCODING_ZIPMAP) { - unsigned char *zm = o->ptr; - robj *valobj = getDecodedObject(c->argv[3]); - - zm = zipmapSet(zm,c->argv[2]->ptr,sdslen(c->argv[2]->ptr), - valobj->ptr,sdslen(valobj->ptr),&update); - decrRefCount(valobj); - o->ptr = zm; - - /* And here there is the second check for hash conversion... - * we want to do it only if the operation was not just an update as - * zipmapLen() is O(N). */ - if (!update && zipmapLen(zm) > server.hash_max_zipmap_entries) - convertToRealHash(o); + unsigned char *v; + unsigned int vlen; + key = getDecodedObject(key); + if (zipmapGet(o->ptr,key->ptr,sdslen(key->ptr),&v,&vlen)) { + value = createStringObject((char*)v,vlen); + } + decrRefCount(key); } else { - tryObjectEncoding(c->argv[2]); - /* note that c->argv[3] is already encoded, as the latest arg - * of a bulk command is always integer encoded if possible. */ - if (dictReplace(o->ptr,c->argv[2],c->argv[3])) { - incrRefCount(c->argv[2]); - } else { - update = 1; + dictEntry *de = dictFind(o->ptr,key); + if (de != NULL) { + value = dictGetEntryVal(de); + incrRefCount(value); } - incrRefCount(c->argv[3]); } - server.dirty++; - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",update == 0)); + return value; } -static void hgetCommand(redisClient *c) { - robj *o; - - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || - checkType(c,o,REDIS_HASH)) return; - +/* Test if the key exists in the given hash. Returns 1 if the key + * exists and 0 when it doesn't. */ +static int hashTypeExists(robj *o, robj *key) { if (o->encoding == REDIS_ENCODING_ZIPMAP) { - unsigned char *zm = o->ptr; - unsigned char *val; - unsigned int vlen; - robj *field; - - field = getDecodedObject(c->argv[2]); - if (zipmapGet(zm,field->ptr,sdslen(field->ptr), &val,&vlen)) { - addReplySds(c,sdscatprintf(sdsempty(),"$%u\r\n", vlen)); - addReplySds(c,sdsnewlen(val,vlen)); - addReply(c,shared.crlf); - decrRefCount(field); - return; - } else { - addReply(c,shared.nullbulk); - decrRefCount(field); - return; + key = getDecodedObject(key); + if (zipmapExists(o->ptr,key->ptr,sdslen(key->ptr))) { + decrRefCount(key); + return 1; } + decrRefCount(key); } else { - struct dictEntry *de; + if (dictFind(o->ptr,key) != NULL) { + return 1; + } + } + return 0; +} - de = dictFind(o->ptr,c->argv[2]); - if (de == NULL) { - addReply(c,shared.nullbulk); - } else { - robj *e = dictGetEntryVal(de); +/* Add an element, discard the old if the key already exists. + * Return 0 on insert and 1 on update. */ +static int hashTypeSet(robj *o, robj *key, robj *value) { + int update = 0; + if (o->encoding == REDIS_ENCODING_ZIPMAP) { + key = getDecodedObject(key); + value = getDecodedObject(value); + o->ptr = zipmapSet(o->ptr, + key->ptr,sdslen(key->ptr), + value->ptr,sdslen(value->ptr), &update); + decrRefCount(key); + decrRefCount(value); - addReplyBulk(c,e); + /* Check if the zipmap needs to be upgraded to a real hash table */ + if (zipmapLen(o->ptr) > server.hash_max_zipmap_entries) + convertToRealHash(o); + } else { + if (dictReplace(o->ptr,key,value)) { + /* Insert */ + incrRefCount(key); + } else { + /* Update */ + update = 1; } + incrRefCount(value); } + return update; } -static void hdelCommand(redisClient *c) { - robj *o; +/* Delete an element from a hash. + * Return 1 on deleted and 0 on not found. */ +static int hashTypeDelete(robj *o, robj *key) { int deleted = 0; - - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,o,REDIS_HASH)) return; - if (o->encoding == REDIS_ENCODING_ZIPMAP) { - robj *field = getDecodedObject(c->argv[2]); - - o->ptr = zipmapDel((unsigned char*) o->ptr, - (unsigned char*) field->ptr, - sdslen(field->ptr), &deleted); - decrRefCount(field); - if (zipmapLen((unsigned char*) o->ptr) == 0) - deleteKey(c->db,c->argv[1]); + key = getDecodedObject(key); + o->ptr = zipmapDel(o->ptr,key->ptr,sdslen(key->ptr), &deleted); + decrRefCount(key); } else { - deleted = dictDelete((dict*)o->ptr,c->argv[2]) == DICT_OK; - if (htNeedsResize(o->ptr)) dictResize(o->ptr); - if (dictSize((dict*)o->ptr) == 0) deleteKey(c->db,c->argv[1]); + deleted = dictDelete((dict*)o->ptr,key) == DICT_OK; + /* Always check if the dictionary needs a resize after a delete. */ + if (deleted && htNeedsResize(o->ptr)) dictResize(o->ptr); } - if (deleted) server.dirty++; - addReply(c,deleted ? shared.cone : shared.czero); + return deleted; } -static void hlenCommand(redisClient *c) { - robj *o; - unsigned long len; +/* Return the number of elements in a hash. */ +static unsigned long hashTypeLength(robj *o) { + return (o->encoding == REDIS_ENCODING_ZIPMAP) ? + zipmapLen((unsigned char*)o->ptr) : dictSize((dict*)o->ptr); +} - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,o,REDIS_HASH)) return; +/* Structure to hold hash iteration abstration. Note that iteration over + * hashes involves both fields and values. Because it is possible that + * not both are required, store pointers in the iterator to avoid + * unnecessary memory allocation for fields/values. */ +typedef struct { + int encoding; + unsigned char *zi; + unsigned char *zk, *zv; + unsigned int zklen, zvlen; - len = (o->encoding == REDIS_ENCODING_ZIPMAP) ? - zipmapLen((unsigned char*)o->ptr) : dictSize((dict*)o->ptr); - addReplyUlong(c,len); + dictIterator *di; + dictEntry *de; +} hashTypeIterator; + +static hashTypeIterator *hashTypeInitIterator(robj *subject) { + hashTypeIterator *hi = zmalloc(sizeof(hashTypeIterator)); + hi->encoding = subject->encoding; + if (hi->encoding == REDIS_ENCODING_ZIPMAP) { + hi->zi = zipmapRewind(subject->ptr); + } else if (hi->encoding == REDIS_ENCODING_HT) { + hi->di = dictGetIterator(subject->ptr); + } else { + redisAssert(NULL); + } + return hi; } -#define REDIS_GETALL_KEYS 1 -#define REDIS_GETALL_VALS 2 -static void genericHgetallCommand(redisClient *c, int flags) { - robj *o, *lenobj; - unsigned long count = 0; - - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullmultibulk)) == NULL - || checkType(c,o,REDIS_HASH)) return; +static void hashTypeReleaseIterator(hashTypeIterator *hi) { + if (hi->encoding == REDIS_ENCODING_HT) { + dictReleaseIterator(hi->di); + } + zfree(hi); +} - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); +/* Move to the next entry in the hash. Return REDIS_OK when the next entry + * could be found and REDIS_ERR when the iterator reaches the end. */ +static int hashTypeNext(hashTypeIterator *hi) { + if (hi->encoding == REDIS_ENCODING_ZIPMAP) { + if ((hi->zi = zipmapNext(hi->zi, &hi->zk, &hi->zklen, + &hi->zv, &hi->zvlen)) == NULL) return REDIS_ERR; + } else { + if ((hi->de = dictNext(hi->di)) == NULL) return REDIS_ERR; + } + return REDIS_OK; +} - if (o->encoding == REDIS_ENCODING_ZIPMAP) { - unsigned char *p = zipmapRewind(o->ptr); - unsigned char *field, *val; - unsigned int flen, vlen; - - while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { - robj *aux; - - if (flags & REDIS_GETALL_KEYS) { - aux = createStringObject((char*)field,flen); - addReplyBulk(c,aux); - decrRefCount(aux); - count++; - } - if (flags & REDIS_GETALL_VALS) { - aux = createStringObject((char*)val,vlen); - addReplyBulk(c,aux); - decrRefCount(aux); - count++; - } +/* Get key or value object at current iteration position. + * This increases the refcount of the field object by 1. */ +static robj *hashTypeCurrent(hashTypeIterator *hi, int what) { + robj *o; + if (hi->encoding == REDIS_ENCODING_ZIPMAP) { + if (what & REDIS_HASH_KEY) { + o = createStringObject((char*)hi->zk,hi->zklen); + } else { + o = createStringObject((char*)hi->zv,hi->zvlen); } } else { - dictIterator *di = dictGetIterator(o->ptr); - dictEntry *de; - - while((de = dictNext(di)) != NULL) { - robj *fieldobj = dictGetEntryKey(de); - robj *valobj = dictGetEntryVal(de); - - if (flags & REDIS_GETALL_KEYS) { - addReplyBulk(c,fieldobj); - count++; - } - if (flags & REDIS_GETALL_VALS) { - addReplyBulk(c,valobj); - count++; - } + if (what & REDIS_HASH_KEY) { + o = dictGetEntryKey(hi->de); + } else { + o = dictGetEntryVal(hi->de); } - dictReleaseIterator(di); + incrRefCount(o); } - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",count); + return o; } -static void hkeysCommand(redisClient *c) { - genericHgetallCommand(c,REDIS_GETALL_KEYS); +static robj *hashTypeLookupWriteOrCreate(redisClient *c, robj *key) { + robj *o = lookupKeyWrite(c->db,key); + if (o == NULL) { + o = createHashObject(); + dbAdd(c->db,key,o); + } else { + if (o->type != REDIS_HASH) { + addReply(c,shared.wrongtypeerr); + return NULL; + } + } + return o; } -static void hvalsCommand(redisClient *c) { - genericHgetallCommand(c,REDIS_GETALL_VALS); -} +/* ============================= Hash commands ============================== */ +static void hsetCommand(redisClient *c) { + int update; + robj *o; -static void hgetallCommand(redisClient *c) { - genericHgetallCommand(c,REDIS_GETALL_KEYS|REDIS_GETALL_VALS); + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + hashTypeTryConversion(o,c->argv,2,3); + hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]); + update = hashTypeSet(o,c->argv[2],c->argv[3]); + addReply(c, update ? shared.czero : shared.cone); + server.dirty++; } -static void hexistsCommand(redisClient *c) { +static void hsetnxCommand(redisClient *c) { robj *o; - int exists = 0; - - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,o,REDIS_HASH)) return; - - if (o->encoding == REDIS_ENCODING_ZIPMAP) { - robj *field; - unsigned char *zm = o->ptr; + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + hashTypeTryConversion(o,c->argv,2,3); - field = getDecodedObject(c->argv[2]); - exists = zipmapExists(zm,field->ptr,sdslen(field->ptr)); - decrRefCount(field); + if (hashTypeExists(o, c->argv[2])) { + addReply(c, shared.czero); } else { - exists = dictFind(o->ptr,c->argv[2]) != NULL; + hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]); + hashTypeSet(o,c->argv[2],c->argv[3]); + addReply(c, shared.cone); + server.dirty++; + } +} + +static void hmsetCommand(redisClient *c) { + int i; + robj *o; + + if ((c->argc % 2) == 1) { + addReplySds(c,sdsnew("-ERR wrong number of arguments for HMSET\r\n")); + return; + } + + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + hashTypeTryConversion(o,c->argv,2,c->argc-1); + for (i = 2; i < c->argc; i += 2) { + hashTypeTryObjectEncoding(o,&c->argv[i], &c->argv[i+1]); + hashTypeSet(o,c->argv[i],c->argv[i+1]); + } + addReply(c, shared.ok); + server.dirty++; +} + +static void hincrbyCommand(redisClient *c) { + long long value, incr; + robj *o, *current, *new; + + if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return; + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + if ((current = hashTypeGet(o,c->argv[2])) != NULL) { + if (getLongLongFromObjectOrReply(c,current,&value, + "hash value is not an integer") != REDIS_OK) { + decrRefCount(current); + return; + } + decrRefCount(current); + } else { + value = 0; + } + + value += incr; + new = createStringObjectFromLongLong(value); + hashTypeTryObjectEncoding(o,&c->argv[2],NULL); + hashTypeSet(o,c->argv[2],new); + decrRefCount(new); + addReplyLongLong(c,value); + server.dirty++; +} + +static void hgetCommand(redisClient *c) { + robj *o, *value; + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || + checkType(c,o,REDIS_HASH)) return; + + if ((value = hashTypeGet(o,c->argv[2])) != NULL) { + addReplyBulk(c,value); + decrRefCount(value); + } else { + addReply(c,shared.nullbulk); + } +} + +static void hmgetCommand(redisClient *c) { + int i; + robj *o, *value; + o = lookupKeyRead(c->db,c->argv[1]); + if (o != NULL && o->type != REDIS_HASH) { + addReply(c,shared.wrongtypeerr); + } + + /* Note the check for o != NULL happens inside the loop. This is + * done because objects that cannot be found are considered to be + * an empty hash. The reply should then be a series of NULLs. */ + addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-2)); + for (i = 2; i < c->argc; i++) { + if (o != NULL && (value = hashTypeGet(o,c->argv[i])) != NULL) { + addReplyBulk(c,value); + decrRefCount(value); + } else { + addReply(c,shared.nullbulk); + } + } +} + +static void hdelCommand(redisClient *c) { + robj *o; + if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || + checkType(c,o,REDIS_HASH)) return; + + if (hashTypeDelete(o,c->argv[2])) { + if (hashTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); + addReply(c,shared.cone); + server.dirty++; + } else { + addReply(c,shared.czero); + } +} + +static void hlenCommand(redisClient *c) { + robj *o; + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + checkType(c,o,REDIS_HASH)) return; + + addReplyUlong(c,hashTypeLength(o)); +} + +static void genericHgetallCommand(redisClient *c, int flags) { + robj *o, *lenobj, *obj; + unsigned long count = 0; + hashTypeIterator *hi; + + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL + || checkType(c,o,REDIS_HASH)) return; + + lenobj = createObject(REDIS_STRING,NULL); + addReply(c,lenobj); + decrRefCount(lenobj); + + hi = hashTypeInitIterator(o); + while (hashTypeNext(hi) != REDIS_ERR) { + if (flags & REDIS_HASH_KEY) { + obj = hashTypeCurrent(hi,REDIS_HASH_KEY); + addReplyBulk(c,obj); + decrRefCount(obj); + count++; + } + if (flags & REDIS_HASH_VALUE) { + obj = hashTypeCurrent(hi,REDIS_HASH_VALUE); + addReplyBulk(c,obj); + decrRefCount(obj); + count++; + } } - addReply(c,exists ? shared.cone : shared.czero); + hashTypeReleaseIterator(hi); + + lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",count); +} + +static void hkeysCommand(redisClient *c) { + genericHgetallCommand(c,REDIS_HASH_KEY); +} + +static void hvalsCommand(redisClient *c) { + genericHgetallCommand(c,REDIS_HASH_VALUE); +} + +static void hgetallCommand(redisClient *c) { + genericHgetallCommand(c,REDIS_HASH_KEY|REDIS_HASH_VALUE); +} + +static void hexistsCommand(redisClient *c) { + robj *o; + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + checkType(c,o,REDIS_HASH)) return; + + addReply(c, hashTypeExists(o,c->argv[2]) ? shared.cone : shared.czero); } static void convertToRealHash(robj *o) { @@ -6130,8 +7292,8 @@ static void convertToRealHash(robj *o) { keyobj = createStringObject((char*)key,klen); valobj = createStringObject((char*)val,vlen); - tryObjectEncoding(keyobj); - tryObjectEncoding(valobj); + keyobj = tryObjectEncoding(keyobj); + valobj = tryObjectEncoding(valobj); dictAdd(dict,keyobj,valobj); } o->encoding = REDIS_ENCODING_HT; @@ -6143,12 +7305,14 @@ static void convertToRealHash(robj *o) { static void flushdbCommand(redisClient *c) { server.dirty += dictSize(c->db->dict); + touchWatchedKeysOnFlush(c->db->id); dictEmpty(c->db->dict); dictEmpty(c->db->expires); addReply(c,shared.ok); } static void flushallCommand(redisClient *c) { + touchWatchedKeysOnFlush(-1); server.dirty += emptyDb(); addReply(c,shared.ok); if (server.bgsavechildpid != -1) { @@ -6167,23 +7331,26 @@ static redisSortOperation *createSortOperation(int type, robj *pattern) { } /* Return the value associated to the key with a name obtained - * substituting the first occurence of '*' in 'pattern' with 'subst' */ + * substituting the first occurence of '*' in 'pattern' with 'subst'. + * The returned object will always have its refcount increased by 1 + * when it is non-NULL. */ static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { - char *p; + char *p, *f; sds spat, ssub; - robj keyobj; - int prefixlen, sublen, postfixlen; + robj keyobj, fieldobj, *o; + int prefixlen, sublen, postfixlen, fieldlen; /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */ struct { long len; long free; char buf[REDIS_SORTKEY_MAX+1]; - } keyname; + } keyname, fieldname; /* If the pattern is "#" return the substitution object itself in order * to implement the "SORT ... GET #" feature. */ spat = pattern->ptr; if (spat[0] == '#' && spat[1] == '\0') { + incrRefCount(subst); return subst; } @@ -6200,20 +7367,47 @@ static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { return NULL; } + /* Find out if we're dealing with a hash dereference. */ + if ((f = strstr(p+1, "->")) != NULL) { + fieldlen = sdslen(spat)-(f-spat); + /* this also copies \0 character */ + memcpy(fieldname.buf,f+2,fieldlen-1); + fieldname.len = fieldlen-2; + } else { + fieldlen = 0; + } + prefixlen = p-spat; sublen = sdslen(ssub); - postfixlen = sdslen(spat)-(prefixlen+1); + postfixlen = sdslen(spat)-(prefixlen+1)-fieldlen; memcpy(keyname.buf,spat,prefixlen); memcpy(keyname.buf+prefixlen,ssub,sublen); memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen); keyname.buf[prefixlen+sublen+postfixlen] = '\0'; keyname.len = prefixlen+sublen+postfixlen; - - initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2)) decrRefCount(subst); - /* printf("lookup '%s' => %p\n", keyname.buf,de); */ - return lookupKeyRead(db,&keyobj); + /* Lookup substituted key */ + initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2)); + o = lookupKeyRead(db,&keyobj); + if (o == NULL) return NULL; + + if (fieldlen > 0) { + if (o->type != REDIS_HASH || fieldname.len < 1) return NULL; + + /* Retrieve value from hash by the field name. This operation + * already increases the refcount of the returned object. */ + initStaticStringObject(fieldobj,((char*)&fieldname)+(sizeof(long)*2)); + o = hashTypeGet(o, &fieldobj); + } else { + if (o->type != REDIS_STRING) return NULL; + + /* Every object that this function returns needs to have its refcount + * increased. sortCommand decreases it again. */ + incrRefCount(o); + } + + return o; } /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with @@ -6248,14 +7442,8 @@ static int sortCompare(const void *s1, const void *s2) { cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr); } } else { - /* Compare elements directly */ - robj *dec1, *dec2; - - dec1 = getDecodedObject(so1->obj); - dec2 = getDecodedObject(so2->obj); - cmp = strcoll(dec1->ptr,dec2->ptr); - decrRefCount(dec1); - decrRefCount(dec2); + /* Compare elements directly. */ + cmp = compareStringObjects(so1->obj,so2->obj); } } return server.sort_desc ? -cmp : cmp; @@ -6265,7 +7453,7 @@ static int sortCompare(const void *s1, const void *s2) { * is optimized for speed and a bit less for readability */ static void sortCommand(redisClient *c) { list *operations; - int outputlen = 0; + unsigned int outputlen = 0; int desc = 0, alpha = 0; int limit_start = 0, limit_count = -1, start, end; int j, dontsort = 0, vectorlen; @@ -6276,7 +7464,7 @@ static void sortCommand(redisClient *c) { /* Lookup the key to sort. It must be of the right types */ sortval = lookupKeyRead(c->db,c->argv[1]); if (sortval == NULL) { - addReply(c,shared.nullmultibulk); + addReply(c,shared.emptymultibulk); return; } if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST && @@ -6335,27 +7523,24 @@ static void sortCommand(redisClient *c) { /* Load the sorting vector with all the objects to sort */ switch(sortval->type) { - case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break; + case REDIS_LIST: vectorlen = listTypeLength(sortval); break; case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break; case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break; - default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */ + default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */ } vector = zmalloc(sizeof(redisSortObject)*vectorlen); j = 0; if (sortval->type == REDIS_LIST) { - list *list = sortval->ptr; - listNode *ln; - listIter li; - - listRewind(list,&li); - while((ln = listNext(&li))) { - robj *ele = ln->value; - vector[j].obj = ele; + listTypeIterator *li = listTypeInitIterator(sortval,0,REDIS_TAIL); + listTypeEntry entry; + while(listTypeNext(li,&entry)) { + vector[j].obj = listTypeGet(&entry); vector[j].u.score = 0; vector[j].u.cmpobj = NULL; j++; } + listTypeReleaseIterator(li); } else { dict *set; dictIterator *di; @@ -6382,38 +7567,36 @@ static void sortCommand(redisClient *c) { /* Now it's time to load the right scores in the sorting vector */ if (dontsort == 0) { for (j = 0; j < vectorlen; j++) { + robj *byval; if (sortby) { - robj *byval; - + /* lookup value to sort by */ byval = lookupKeyByPattern(c->db,sortby,vector[j].obj); - if (!byval || byval->type != REDIS_STRING) continue; - if (alpha) { - vector[j].u.cmpobj = getDecodedObject(byval); - } else { - if (byval->encoding == REDIS_ENCODING_RAW) { - vector[j].u.score = strtod(byval->ptr,NULL); - } else { - /* Don't need to decode the object if it's - * integer-encoded (the only encoding supported) so - * far. We can just cast it */ - if (byval->encoding == REDIS_ENCODING_INT) { - vector[j].u.score = (long)byval->ptr; - } else - redisAssert(1 != 1); - } - } + if (!byval) continue; } else { - if (!alpha) { - if (vector[j].obj->encoding == REDIS_ENCODING_RAW) - vector[j].u.score = strtod(vector[j].obj->ptr,NULL); - else { - if (vector[j].obj->encoding == REDIS_ENCODING_INT) - vector[j].u.score = (long) vector[j].obj->ptr; - else - redisAssert(1 != 1); - } + /* use object itself to sort by */ + byval = vector[j].obj; + } + + if (alpha) { + if (sortby) vector[j].u.cmpobj = getDecodedObject(byval); + } else { + if (byval->encoding == REDIS_ENCODING_RAW) { + vector[j].u.score = strtod(byval->ptr,NULL); + } else if (byval->encoding == REDIS_ENCODING_INT) { + /* Don't need to decode the object if it's + * integer-encoded (the only encoding supported) so + * far. We can just cast it */ + vector[j].u.score = (long)byval->ptr; + } else { + redisAssert(1 != 1); } } + + /* when the object was retrieved using lookupKeyByPattern, + * its refcount needs to be decreased. */ + if (sortby) { + decrRefCount(byval); + } } } @@ -6455,10 +7638,11 @@ static void sortCommand(redisClient *c) { vector[j].obj); if (sop->type == REDIS_SORT_GET) { - if (!val || val->type != REDIS_STRING) { + if (!val) { addReply(c,shared.nullbulk); } else { addReplyBulk(c,val); + decrRefCount(val); } } else { redisAssert(sop->type == REDIS_SORT_GET); /* always fails */ @@ -6466,8 +7650,7 @@ static void sortCommand(redisClient *c) { } } } else { - robj *listObject = createListObject(); - list *listPtr = (list*) listObject->ptr; + robj *sobj = createZiplistObject(); /* STORE option specified, set the sorting result as a List object */ for (j = start; j <= end; j++) { @@ -6475,30 +7658,30 @@ static void sortCommand(redisClient *c) { listIter li; if (!getop) { - listAddNodeTail(listPtr,vector[j].obj); - incrRefCount(vector[j].obj); - } - listRewind(operations,&li); - while((ln = listNext(&li))) { - redisSortOperation *sop = ln->value; - robj *val = lookupKeyByPattern(c->db,sop->pattern, - vector[j].obj); - - if (sop->type == REDIS_SORT_GET) { - if (!val || val->type != REDIS_STRING) { - listAddNodeTail(listPtr,createStringObject("",0)); + listTypePush(sobj,vector[j].obj,REDIS_TAIL); + } else { + listRewind(operations,&li); + while((ln = listNext(&li))) { + redisSortOperation *sop = ln->value; + robj *val = lookupKeyByPattern(c->db,sop->pattern, + vector[j].obj); + + if (sop->type == REDIS_SORT_GET) { + if (!val) val = createStringObject("",0); + + /* listTypePush does an incrRefCount, so we should take care + * care of the incremented refcount caused by either + * lookupKeyByPattern or createStringObject("",0) */ + listTypePush(sobj,val,REDIS_TAIL); + decrRefCount(val); } else { - listAddNodeTail(listPtr,val); - incrRefCount(val); + /* always fails */ + redisAssert(sop->type == REDIS_SORT_GET); } - } else { - redisAssert(sop->type == REDIS_SORT_GET); /* always fails */ } } } - if (dictReplace(c->db->dict,storekey,listObject)) { - incrRefCount(storekey); - } + dbReplace(c->db,storekey,sobj); /* Note: we add 1 because the DB is dirty anyway since even if the * SORT result is empty a new key is set and maybe the old content * replaced. */ @@ -6507,10 +7690,13 @@ static void sortCommand(redisClient *c) { } /* Cleanup */ + if (sortval->type == REDIS_LIST) + for (j = 0; j < vectorlen; j++) + decrRefCount(vector[j].obj); decrRefCount(sortval); listRelease(operations); for (j = 0; j < vectorlen; j++) { - if (sortby && alpha && vector[j].u.cmpobj) + if (alpha && vector[j].u.cmpobj) decrRefCount(vector[j].u.cmpobj); } zfree(vector); @@ -6549,6 +7735,8 @@ static sds genRedisInfoString(void) { bytesToHuman(hmem,zmalloc_used_memory()); info = sdscatprintf(sdsempty(), "redis_version:%s\r\n" + "redis_git_sha1:%s\r\n" + "redis_git_dirty:%d\r\n" "arch_bits:%s\r\n" "multiplexing_api:%s\r\n" "process_id:%ld\r\n" @@ -6566,11 +7754,15 @@ static sds genRedisInfoString(void) { "total_connections_received:%lld\r\n" "total_commands_processed:%lld\r\n" "expired_keys:%lld\r\n" - "hash_max_zipmap_entries:%ld\r\n" - "hash_max_zipmap_value:%ld\r\n" + "hash_max_zipmap_entries:%zu\r\n" + "hash_max_zipmap_value:%zu\r\n" + "pubsub_channels:%ld\r\n" + "pubsub_patterns:%u\r\n" "vm_enabled:%d\r\n" "role:%s\r\n" ,REDIS_VERSION, + redisGitSHA1(), + strtol(redisGitDirty(),NULL,10) > 0, (sizeof(long) == 8) ? "64" : "32", aeGetApiName(), (long) getpid(), @@ -6590,6 +7782,8 @@ static sds genRedisInfoString(void) { server.stat_expiredkeys, server.hash_max_zipmap_entries, server.hash_max_zipmap_value, + dictSize(server.pubsub_channels), + listLength(server.pubsub_patterns), server.vm_enabled != 0, server.masterhost == NULL ? "master" : "slave" ); @@ -6669,7 +7863,10 @@ static void monitorCommand(redisClient *c) { /* ================================= Expire ================================= */ static int removeExpire(redisDb *db, robj *key) { - if (dictDelete(db->expires,key) == DICT_OK) { + /* An expire may only be removed if there is a corresponding entry in the + * main dict. Otherwise, the key will never be freed. */ + redisAssert(dictFind(db->dict,key->ptr) != NULL); + if (dictDelete(db->expires,key->ptr) == DICT_OK) { return 1; } else { return 0; @@ -6677,10 +7874,13 @@ static int removeExpire(redisDb *db, robj *key) { } static int setExpire(redisDb *db, robj *key, time_t when) { - if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) { + dictEntry *de; + + /* Reuse the sds from the main dict in the expire dict */ + redisAssert((de = dictFind(db->dict,key->ptr)) != NULL); + if (dictAdd(db->expires,dictGetEntryKey(de),(void*)when) == DICT_ERR) { return 0; } else { - incrRefCount(key); return 1; } } @@ -6692,53 +7892,51 @@ static time_t getExpire(redisDb *db, robj *key) { /* No expire? return ASAP */ if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key)) == NULL) return -1; + (de = dictFind(db->expires,key->ptr)) == NULL) return -1; + /* The entry was found in the expire dict, this means it should also + * be present in the main dict (safety check). */ + redisAssert(dictFind(db->dict,key->ptr) != NULL); return (time_t) dictGetEntryVal(de); } static int expireIfNeeded(redisDb *db, robj *key) { - time_t when; - dictEntry *de; + time_t when = getExpire(db,key); + if (when < 0) return 0; - /* No expire? return ASAP */ - if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key)) == NULL) return 0; - - /* Lookup the expire */ - when = (time_t) dictGetEntryVal(de); + /* Return when this key has not expired */ if (time(NULL) <= when) return 0; /* Delete the key */ - dictDelete(db->expires,key); server.stat_expiredkeys++; - return dictDelete(db->dict,key) == DICT_OK; + server.dirty++; + return dbDelete(db,key); } static int deleteIfVolatile(redisDb *db, robj *key) { - dictEntry *de; - - /* No expire? return ASAP */ - if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key)) == NULL) return 0; + if (getExpire(db,key) < 0) return 0; /* Delete the key */ - server.dirty++; server.stat_expiredkeys++; - dictDelete(db->expires,key); - return dictDelete(db->dict,key) == DICT_OK; + server.dirty++; + return dbDelete(db,key); } -static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) { +static void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) { dictEntry *de; + time_t seconds; + + if (getLongFromObjectOrReply(c, param, &seconds, NULL) != REDIS_OK) return; - de = dictFind(c->db->dict,key); + seconds -= offset; + + de = dictFind(c->db->dict,key->ptr); if (de == NULL) { addReply(c,shared.czero); return; } - if (seconds < 0) { - if (deleteKey(c->db,key)) server.dirty++; + if (seconds <= 0) { + if (dbDelete(c->db,key)) server.dirty++; addReply(c, shared.cone); return; } else { @@ -6754,11 +7952,11 @@ static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) { } static void expireCommand(redisClient *c) { - expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)); + expireGenericCommand(c,c->argv[1],c->argv[2],0); } static void expireatCommand(redisClient *c) { - expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL)); + expireGenericCommand(c,c->argv[1],c->argv[2],time(NULL)); } static void ttlCommand(redisClient *c) { @@ -6814,6 +8012,10 @@ static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) { } static void multiCommand(redisClient *c) { + if (c->flags & REDIS_MULTI) { + addReplySds(c,sdsnew("-ERR MULTI calls can not be nested\r\n")); + return; + } c->flags |= REDIS_MULTI; addReply(c,shared.ok); } @@ -6827,9 +8029,24 @@ static void discardCommand(redisClient *c) { freeClientMultiState(c); initClientMultiState(c); c->flags &= (~REDIS_MULTI); + unwatchAllKeys(c); addReply(c,shared.ok); } +/* Send a MULTI command to all the slaves and AOF file. Check the execCommand + * implememntation for more information. */ +static void execCommandReplicateMulti(redisClient *c) { + struct redisCommand *cmd; + robj *multistring = createStringObject("MULTI",5); + + cmd = lookupCommand("multi"); + if (server.appendonly) + feedAppendOnlyFile(cmd,c->db->id,&multistring,1); + if (listLength(server.slaves)) + replicationFeedSlaves(server.slaves,c->db->id,&multistring,1); + decrRefCount(multistring); +} + static void execCommand(redisClient *c) { int j; robj **orig_argv; @@ -6840,6 +8057,25 @@ static void execCommand(redisClient *c) { return; } + /* Check if we need to abort the EXEC if some WATCHed key was touched. + * A failed EXEC will return a multi bulk nil object. */ + if (c->flags & REDIS_DIRTY_CAS) { + freeClientMultiState(c); + initClientMultiState(c); + c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); + unwatchAllKeys(c); + addReply(c,shared.nullmultibulk); + return; + } + + /* Replicate a MULTI request now that we are sure the block is executed. + * This way we'll deliver the MULTI/..../EXEC block as a whole and + * both the AOF and the replication link will have the same consistency + * and atomicity guarantees. */ + execCommandReplicateMulti(c); + + /* Exec all the queued commands */ + unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; orig_argc = c->argc; addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count)); @@ -6852,7 +8088,11 @@ static void execCommand(redisClient *c) { c->argc = orig_argc; freeClientMultiState(c); initClientMultiState(c); - c->flags &= (~REDIS_MULTI); + c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); + /* Make sure the EXEC command is always replicated / AOF, since we + * always send the MULTI command (we can't know beforehand if the + * next operations will contain at least a modification to the DB). */ + server.dirty++; } /* =========================== Blocking Operations ========================= */ @@ -6875,7 +8115,7 @@ static void execCommand(redisClient *c) { * empty we need to block. In order to do so we remove the notification for * new data to read in the client socket (so that we'll not serve new * requests if the blocking request is not served). Also we put the client - * in a dictionary (db->blockingkeys) mapping keys to a list of clients + * in a dictionary (db->blocking_keys) mapping keys to a list of clients * blocking for this keys. * - If a PUSH operation against a key with blocked clients waiting is * performed, we serve the first in the list: basically instead to push @@ -6893,22 +8133,22 @@ static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeou list *l; int j; - c->blockingkeys = zmalloc(sizeof(robj*)*numkeys); - c->blockingkeysnum = numkeys; + c->blocking_keys = zmalloc(sizeof(robj*)*numkeys); + c->blocking_keys_num = numkeys; c->blockingto = timeout; for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->blockingkeys[j] = keys[j]; + c->blocking_keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ - de = dictFind(c->db->blockingkeys,keys[j]); + de = dictFind(c->db->blocking_keys,keys[j]); if (de == NULL) { int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); - retval = dictAdd(c->db->blockingkeys,keys[j],l); + retval = dictAdd(c->db->blocking_keys,keys[j],l); incrRefCount(keys[j]); assert(retval == DICT_OK); } else { @@ -6927,22 +8167,22 @@ static void unblockClientWaitingData(redisClient *c) { list *l; int j; - assert(c->blockingkeys != NULL); + assert(c->blocking_keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->blockingkeysnum; j++) { + for (j = 0; j < c->blocking_keys_num; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blockingkeys,c->blockingkeys[j]); + de = dictFind(c->db->blocking_keys,c->blocking_keys[j]); assert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blockingkeys,c->blockingkeys[j]); - decrRefCount(c->blockingkeys[j]); + dictDelete(c->db->blocking_keys,c->blocking_keys[j]); + decrRefCount(c->blocking_keys[j]); } /* Cleanup the client structure */ - zfree(c->blockingkeys); - c->blockingkeys = NULL; + zfree(c->blocking_keys); + c->blocking_keys = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -6969,7 +8209,7 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { list *l; listNode *ln; - de = dictFind(c->db->blockingkeys,key); + de = dictFind(c->db->blocking_keys,key); if (de == NULL) return 0; l = dictGetEntryVal(de); ln = listFirst(l); @@ -7002,7 +8242,7 @@ static void blockingPopGenericCommand(redisClient *c, int where) { * non-blocking POP operation */ robj *argv[2], **orig_argv; int orig_argc; - + /* We need to alter the command arguments before to call * popGenericCommand() as the command takes a single key. */ orig_argv = c->argv; @@ -7239,7 +8479,7 @@ static void updateSlavesWaitingBgsave(int bgsaveerr) { slave->replstate = REDIS_REPL_WAIT_BGSAVE_END; } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) { struct redis_stat buf; - + if (bgsaveerr != REDIS_OK) { freeClient(slave); redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error"); @@ -7466,7 +8706,7 @@ static void freeMemoryIfNeeded(void) { minttl = t; } } - deleteKey(server.db+j,minkey); + dbDelete(server.db+j,minkey); } } if (!freed) return; /* nothing to free... */ @@ -7475,65 +8715,70 @@ static void freeMemoryIfNeeded(void) { /* ============================== Append Only file ========================== */ -static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { - sds buf = sdsempty(); - int j; - ssize_t nwritten; - time_t now; - robj *tmpargv[3]; +/* Called when the user switches from "appendonly yes" to "appendonly no" + * at runtime using the CONFIG command. */ +static void stopAppendOnly(void) { + flushAppendOnlyFile(); + aof_fsync(server.appendfd); + close(server.appendfd); - /* The DB this command was targetting is not the same as the last command - * we appendend. To issue a SELECT command is needed. */ - if (dictid != server.appendseldb) { - char seldb[64]; + server.appendfd = -1; + server.appendseldb = -1; + server.appendonly = 0; + /* rewrite operation in progress? kill it, wait child exit */ + if (server.bgsavechildpid != -1) { + int statloc; - snprintf(seldb,sizeof(seldb),"%d",dictid); - buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", - (unsigned long)strlen(seldb),seldb); - server.appendseldb = dictid; + if (kill(server.bgsavechildpid,SIGKILL) != -1) + wait3(&statloc,0,NULL); + /* reset the buffer accumulating changes while the child saves */ + sdsfree(server.bgrewritebuf); + server.bgrewritebuf = sdsempty(); + server.bgsavechildpid = -1; } +} - /* "Fix" the argv vector if the command is EXPIRE. We want to translate - * EXPIREs into EXPIREATs calls */ - if (cmd->proc == expireCommand) { - long when; - - tmpargv[0] = createStringObject("EXPIREAT",8); - tmpargv[1] = argv[1]; - incrRefCount(argv[1]); - when = time(NULL)+strtol(argv[2]->ptr,NULL,10); - tmpargv[2] = createObject(REDIS_STRING, - sdscatprintf(sdsempty(),"%ld",when)); - argv = tmpargv; +/* Called when the user switches from "appendonly no" to "appendonly yes" + * at runtime using the CONFIG command. */ +static int startAppendOnly(void) { + server.appendonly = 1; + server.lastfsync = time(NULL); + server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); + if (server.appendfd == -1) { + redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno)); + return REDIS_ERR; } - - /* Append the actual command */ - buf = sdscatprintf(buf,"*%d\r\n",argc); - for (j = 0; j < argc; j++) { - robj *o = argv[j]; - - o = getDecodedObject(o); - buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr)); - buf = sdscatlen(buf,o->ptr,sdslen(o->ptr)); - buf = sdscatlen(buf,"\r\n",2); - decrRefCount(o); + if (rewriteAppendOnlyFileBackground() == REDIS_ERR) { + server.appendonly = 0; + close(server.appendfd); + redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno)); + return REDIS_ERR; } + return REDIS_OK; +} - /* Free the objects from the modified argv for EXPIREAT */ - if (cmd->proc == expireCommand) { - for (j = 0; j < 3; j++) - decrRefCount(argv[j]); - } +/* Write the append only file buffer on disk. + * + * Since we are required to write the AOF before replying to the client, + * and the only way the client socket can get a write is entering when the + * the event loop, we accumulate all the AOF writes in a memory + * buffer and write it on disk using this function just before entering + * the event loop again. */ +static void flushAppendOnlyFile(void) { + time_t now; + ssize_t nwritten; + + if (sdslen(server.aofbuf) == 0) return; /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */ - nwritten = write(server.appendfd,buf,sdslen(buf)); - if (nwritten != (signed)sdslen(buf)) { + nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf)); + if (nwritten != (signed)sdslen(server.aofbuf)) { /* Ooops, we are in troubles. The best thing to do for now is - * to simply exit instead to give the illusion that everything is + * aborting instead of giving the illusion that everything is * working as expected. */ if (nwritten == -1) { redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno)); @@ -7542,58 +8787,141 @@ static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv } exit(1); } - /* If a background append only file rewriting is in progress we want to - * accumulate the differences between the child DB and the current one - * in a buffer, so that when the child process will do its work we - * can append the differences to the new append only file. */ - if (server.bgrewritechildpid != -1) - server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); + sdsfree(server.aofbuf); + server.aofbuf = sdsempty(); - sdsfree(buf); + /* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have + * childs performing heavy I/O on disk. */ + if (server.no_appendfsync_on_rewrite && + (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1)) + return; + /* Fsync if needed */ now = time(NULL); if (server.appendfsync == APPENDFSYNC_ALWAYS || (server.appendfsync == APPENDFSYNC_EVERYSEC && now-server.lastfsync > 1)) { - fsync(server.appendfd); /* Let's try to get this data on the disk */ + /* aof_fsync is defined as fdatasync() for Linux in order to avoid + * flushing metadata. */ + aof_fsync(server.appendfd); /* Let's try to get this data on the disk */ server.lastfsync = now; } } -/* In Redis commands are always executed in the context of a client, so in - * order to load the append only file we need to create a fake client. */ -static struct redisClient *createFakeClient(void) { - struct redisClient *c = zmalloc(sizeof(*c)); - - selectDb(c,0); - c->fd = -1; - c->querybuf = sdsempty(); - c->argc = 0; - c->argv = NULL; - c->flags = 0; - /* We set the fake client as a slave waiting for the synchronization - * so that Redis will not try to send replies to this client. */ - c->replstate = REDIS_REPL_WAIT_BGSAVE_START; - c->reply = listCreate(); - listSetFreeMethod(c->reply,decrRefCount); - listSetDupMethod(c->reply,dupClientReplyValue); - return c; +static sds catAppendOnlyGenericCommand(sds buf, int argc, robj **argv) { + int j; + buf = sdscatprintf(buf,"*%d\r\n",argc); + for (j = 0; j < argc; j++) { + robj *o = getDecodedObject(argv[j]); + buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr)); + buf = sdscatlen(buf,o->ptr,sdslen(o->ptr)); + buf = sdscatlen(buf,"\r\n",2); + decrRefCount(o); + } + return buf; } -static void freeFakeClient(struct redisClient *c) { - sdsfree(c->querybuf); - listRelease(c->reply); - zfree(c); +static sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *seconds) { + int argc = 3; + long when; + robj *argv[3]; + + /* Make sure we can use strtol */ + seconds = getDecodedObject(seconds); + when = time(NULL)+strtol(seconds->ptr,NULL,10); + decrRefCount(seconds); + + argv[0] = createStringObject("EXPIREAT",8); + argv[1] = key; + argv[2] = createObject(REDIS_STRING, + sdscatprintf(sdsempty(),"%ld",when)); + buf = catAppendOnlyGenericCommand(buf, argc, argv); + decrRefCount(argv[0]); + decrRefCount(argv[2]); + return buf; } -/* Replay the append log file. On error REDIS_OK is returned. On non fatal - * error (the append only file is zero-length) REDIS_ERR is returned. On - * fatal error an error message is logged and the program exists. */ +static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { + sds buf = sdsempty(); + robj *tmpargv[3]; + + /* The DB this command was targetting is not the same as the last command + * we appendend. To issue a SELECT command is needed. */ + if (dictid != server.appendseldb) { + char seldb[64]; + + snprintf(seldb,sizeof(seldb),"%d",dictid); + buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", + (unsigned long)strlen(seldb),seldb); + server.appendseldb = dictid; + } + + if (cmd->proc == expireCommand) { + /* Translate EXPIRE into EXPIREAT */ + buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]); + } else if (cmd->proc == setexCommand) { + /* Translate SETEX to SET and EXPIREAT */ + tmpargv[0] = createStringObject("SET",3); + tmpargv[1] = argv[1]; + tmpargv[2] = argv[3]; + buf = catAppendOnlyGenericCommand(buf,3,tmpargv); + decrRefCount(tmpargv[0]); + buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]); + } else { + buf = catAppendOnlyGenericCommand(buf,argc,argv); + } + + /* Append to the AOF buffer. This will be flushed on disk just before + * of re-entering the event loop, so before the client will get a + * positive reply about the operation performed. */ + server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); + + /* If a background append only file rewriting is in progress we want to + * accumulate the differences between the child DB and the current one + * in a buffer, so that when the child process will do its work we + * can append the differences to the new append only file. */ + if (server.bgrewritechildpid != -1) + server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); + + sdsfree(buf); +} + +/* In Redis commands are always executed in the context of a client, so in + * order to load the append only file we need to create a fake client. */ +static struct redisClient *createFakeClient(void) { + struct redisClient *c = zmalloc(sizeof(*c)); + + selectDb(c,0); + c->fd = -1; + c->querybuf = sdsempty(); + c->argc = 0; + c->argv = NULL; + c->flags = 0; + /* We set the fake client as a slave waiting for the synchronization + * so that Redis will not try to send replies to this client. */ + c->replstate = REDIS_REPL_WAIT_BGSAVE_START; + c->reply = listCreate(); + listSetFreeMethod(c->reply,decrRefCount); + listSetDupMethod(c->reply,dupClientReplyValue); + initClientMultiState(c); + return c; +} + +static void freeFakeClient(struct redisClient *c) { + sdsfree(c->querybuf); + listRelease(c->reply); + freeClientMultiState(c); + zfree(c); +} + +/* Replay the append log file. On error REDIS_OK is returned. On non fatal + * error (the append only file is zero-length) REDIS_ERR is returned. On + * fatal error an error message is logged and the program exists. */ int loadAppendOnlyFile(char *filename) { struct redisClient *fakeClient; FILE *fp = fopen(filename,"r"); struct redis_stat sb; - unsigned long long loadedkeys = 0; + int appendonly = server.appendonly; if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) return REDIS_ERR; @@ -7603,6 +8931,10 @@ int loadAppendOnlyFile(char *filename) { exit(1); } + /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI + * to the same file we're about to read. */ + server.appendonly = 0; + fakeClient = createFakeClient(); while(1) { int argc, j; @@ -7611,6 +8943,7 @@ int loadAppendOnlyFile(char *filename) { char buf[128]; sds argsds; struct redisCommand *cmd; + int force_swapout; if (fgets(buf,sizeof(buf),fp) == NULL) { if (feof(fp)) @@ -7637,14 +8970,9 @@ int loadAppendOnlyFile(char *filename) { redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr); exit(1); } - /* Try object sharing and encoding */ - if (server.shareobjects) { - int j; - for(j = 1; j < argc; j++) - argv[j] = tryObjectSharing(argv[j]); - } + /* Try object encoding */ if (cmd->flags & REDIS_CMD_BULK) - tryObjectEncoding(argv[argc-1]); + argv[argc-1] = tryObjectEncoding(argv[argc-1]); /* Run the command in the context of a fake client */ fakeClient->argc = argc; fakeClient->argv = argv; @@ -7656,15 +8984,24 @@ int loadAppendOnlyFile(char *filename) { for (j = 0; j < argc; j++) decrRefCount(argv[j]); zfree(argv); /* Handle swapping while loading big datasets when VM is on */ - loadedkeys++; - if (server.vm_enabled && (loadedkeys % 5000) == 0) { + force_swapout = 0; + if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) + force_swapout = 1; + + if (server.vm_enabled && force_swapout) { while (zmalloc_used_memory() > server.vm_max_memory) { if (vmSwapOneObjectBlocking() == REDIS_ERR) break; } } } + + /* This point can only be reached when EOF is reached without errors. + * If the client is in the middle of a MULTI/EXEC, log error and quit. */ + if (fakeClient->flags & REDIS_MULTI) goto readerr; + fclose(fp); freeFakeClient(fakeClient); + server.appendonly = appendonly; return REDIS_OK; readerr: @@ -7679,40 +9016,17 @@ fmterr: exit(1); } -/* Write an object into a file in the bulk format $\r\n\r\n */ -static int fwriteBulkObject(FILE *fp, robj *obj) { - char buf[128]; - int decrrc = 0; - - /* Avoid the incr/decr ref count business if possible to help - * copy-on-write (we are often in a child process when this function - * is called). - * Also makes sure that key objects don't get incrRefCount-ed when VM - * is enabled */ - if (obj->encoding != REDIS_ENCODING_RAW) { - obj = getDecodedObject(obj); - decrrc = 1; - } - snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr)); - if (fwrite(buf,strlen(buf),1,fp) == 0) goto err; - if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) - goto err; - if (fwrite("\r\n",2,1,fp) == 0) goto err; - if (decrrc) decrRefCount(obj); - return 1; -err: - if (decrrc) decrRefCount(obj); - return 0; -} - /* Write binary-safe string into a file in the bulkformat * $\r\n\r\n */ static int fwriteBulkString(FILE *fp, char *s, unsigned long len) { - char buf[128]; - - snprintf(buf,sizeof(buf),"$%ld\r\n",(unsigned long)len); - if (fwrite(buf,strlen(buf),1,fp) == 0) return 0; - if (len && fwrite(s,len,1,fp) == 0) return 0; + char cbuf[128]; + int clen; + cbuf[0] = '$'; + clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len); + cbuf[clen++] = '\r'; + cbuf[clen++] = '\n'; + if (fwrite(cbuf,clen,1,fp) == 0) return 0; + if (len > 0 && fwrite(s,len,1,fp) == 0) return 0; if (fwrite("\r\n",2,1,fp) == 0) return 0; return 1; } @@ -7729,16 +9043,28 @@ static int fwriteBulkDouble(FILE *fp, double d) { } /* Write a long value in bulk format $\r\n\r\n */ -static int fwriteBulkLong(FILE *fp, long l) { - char buf[128], lbuf[128]; - - snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l); - snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2); - if (fwrite(buf,strlen(buf),1,fp) == 0) return 0; - if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0; +static int fwriteBulkLongLong(FILE *fp, long long l) { + char bbuf[128], lbuf[128]; + unsigned int blen, llen; + llen = ll2string(lbuf,32,l); + blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf); + if (fwrite(bbuf,blen,1,fp) == 0) return 0; return 1; } +/* Delegate writing an object to writing a bulk string or bulk long long. */ +static int fwriteBulkObject(FILE *fp, robj *obj) { + /* Avoid using getDecodedObject to help copy-on-write (we are often + * in a child process when this function is called). */ + if (obj->encoding == REDIS_ENCODING_INT) { + return fwriteBulkLongLong(fp,(long)obj->ptr); + } else if (obj->encoding == REDIS_ENCODING_RAW) { + return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr)); + } else { + redisPanic("Unknown string encoding"); + } +} + /* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */ static int rewriteAppendOnlyFile(char *filename) { @@ -7770,28 +9096,30 @@ static int rewriteAppendOnlyFile(char *filename) { /* SELECT the new DB */ if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkLong(fp,j) == 0) goto werr; + if (fwriteBulkLongLong(fp,j) == 0) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - robj *key, *o; + sds keystr = dictGetEntryKey(de); + robj key, *o; time_t expiretime; int swapped; - key = dictGetEntryKey(de); + keystr = dictGetEntryKey(de); + o = dictGetEntryVal(de); + initStaticStringObject(key,keystr); /* If the value for this key is swapped, load a preview in memory. * We use a "swapped" flag to remember if we need to free the * value object instead to just increment the ref count anyway * in order to avoid copy-on-write of pages if we are forked() */ - if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) { - o = dictGetEntryVal(de); + if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || + o->storage == REDIS_VM_SWAPPING) { swapped = 0; } else { - o = vmPreviewObject(key); + o = vmPreviewObject(o); swapped = 1; } - expiretime = getExpire(db,key); + expiretime = getExpire(db,&key); /* Save the key and associated value */ if (o->type == REDIS_STRING) { @@ -7799,22 +9127,45 @@ static int rewriteAppendOnlyFile(char *filename) { char cmd[]="*3\r\n$3\r\nSET\r\n"; if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; /* Key and value */ - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { /* Emit the RPUSHes needed to rebuild the list */ - list *list = o->ptr; - listNode *ln; - listIter li; + char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = o->ptr; + unsigned char *p = ziplistIndex(zl,0); + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + while(ziplistGet(p,&vstr,&vlen,&vlong)) { + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (vstr) { + if (fwriteBulkString(fp,(char*)vstr,vlen) == 0) + goto werr; + } else { + if (fwriteBulkLongLong(fp,vlong) == 0) + goto werr; + } + p = ziplistNext(zl,p); + } + } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { + list *list = o->ptr; + listNode *ln; + listIter li; - listRewind(list,&li); - while((ln = listNext(&li))) { - char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; - robj *eleobj = listNodeValue(ln); + listRewind(list,&li); + while((ln = listNext(&li))) { + robj *eleobj = listNodeValue(ln); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; - if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + } + } else { + redisPanic("Unknown list encoding"); } } else if (o->type == REDIS_SET) { /* Emit the SADDs needed to rebuild the set */ @@ -7827,7 +9178,7 @@ static int rewriteAppendOnlyFile(char *filename) { robj *eleobj = dictGetEntryKey(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,eleobj) == 0) goto werr; } dictReleaseIterator(di); @@ -7843,7 +9194,7 @@ static int rewriteAppendOnlyFile(char *filename) { double *score = dictGetEntryVal(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkDouble(fp,*score) == 0) goto werr; if (fwriteBulkObject(fp,eleobj) == 0) goto werr; } @@ -7859,7 +9210,7 @@ static int rewriteAppendOnlyFile(char *filename) { while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkString(fp,(char*)field,flen) == -1) return -1; if (fwriteBulkString(fp,(char*)val,vlen) == -1) @@ -7874,14 +9225,14 @@ static int rewriteAppendOnlyFile(char *filename) { robj *val = dictGetEntryVal(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,field) == -1) return -1; if (fwriteBulkObject(fp,val) == -1) return -1; } dictReleaseIterator(di); } } else { - redisAssert(0); + redisPanic("Unknown object type"); } /* Save the expire time */ if (expiretime != -1) { @@ -7889,8 +9240,8 @@ static int rewriteAppendOnlyFile(char *filename) { /* If this key is already expired skip it */ if (expiretime < now) continue; if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; - if (fwriteBulkLong(fp,expiretime) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr; } if (swapped) decrRefCount(o); } @@ -7899,9 +9250,9 @@ static int rewriteAppendOnlyFile(char *filename) { /* Make sure data will not remain on the OS's output buffers */ fflush(fp); - fsync(fileno(fp)); + aof_fsync(fileno(fp)); fclose(fp); - + /* Use RENAME to make sure the DB file is changed atomically only * if the generate DB file is ok. */ if (rename(tmpfile,filename) == -1) { @@ -7960,6 +9311,7 @@ static int rewriteAppendOnlyFileBackground(void) { redisLog(REDIS_NOTICE, "Background append only file rewriting started by pid %d",childpid); server.bgrewritechildpid = childpid; + updateDictResizePolicy(); /* We set appendseldb to -1 in order to force the next call to the * feedAppendOnlyFile() to issue a SELECT command, so the differences * accumulated by the parent into server.bgrewritebuf will start @@ -8013,42 +9365,49 @@ static void aofRemoveTempFile(pid_t childpid) { /* =================== Virtual Memory - Blocking Side ====================== */ -/* substitute the first occurrence of '%p' with the process pid in the - * swap file name. */ -static void expandVmSwapFilename(void) { - char *p = strstr(server.vm_swap_file,"%p"); - sds new; - - if (!p) return; - new = sdsempty(); - *p = '\0'; - new = sdscat(new,server.vm_swap_file); - new = sdscatprintf(new,"%ld",(long) getpid()); - new = sdscat(new,p+2); - zfree(server.vm_swap_file); - server.vm_swap_file = new; +/* Create a VM pointer object. This kind of objects are used in place of + * values in the key -> value hash table, for swapped out objects. */ +static vmpointer *createVmPointer(int vtype) { + vmpointer *vp = zmalloc(sizeof(vmpointer)); + + vp->type = REDIS_VMPOINTER; + vp->storage = REDIS_VM_SWAPPED; + vp->vtype = vtype; + return vp; } static void vmInit(void) { off_t totsize; int pipefds[2]; size_t stacksize; + struct flock fl; if (server.vm_max_threads != 0) zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ - expandVmSwapFilename(); redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file); + /* Try to open the old swap file, otherwise create it */ if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) { server.vm_fp = fopen(server.vm_swap_file,"w+b"); } if (server.vm_fp == NULL) { redisLog(REDIS_WARNING, - "Impossible to open the swap file: %s. Exiting.", + "Can't open the swap file: %s. Exiting.", strerror(errno)); exit(1); } server.vm_fd = fileno(server.vm_fp); + /* Lock the swap file for writing, this is useful in order to avoid + * another instance to use the same swap file for a config error. */ + fl.l_type = F_WRLCK; + fl.l_whence = SEEK_SET; + fl.l_start = fl.l_len = 0; + if (fcntl(server.vm_fd,F_SETLK,&fl) == -1) { + redisLog(REDIS_WARNING, + "Can't lock the swap file at '%s': %s. Make sure it is not used by another Redis instance.", server.vm_swap_file, strerror(errno)); + exit(1); + } + /* Initialize */ server.vm_next_page = 0; server.vm_near_pages = 0; server.vm_stats_used_pages = 0; @@ -8143,7 +9502,7 @@ static int vmFreePage(off_t page) { } /* Find N contiguous free pages storing the first page of the cluster in *first. - * Returns REDIS_OK if it was able to find N contiguous pages, otherwise + * Returns REDIS_OK if it was able to find N contiguous pages, otherwise * REDIS_ERR is returned. * * This function uses a simple algorithm: we try to allocate @@ -8154,7 +9513,7 @@ static int vmFreePage(off_t page) { * we try to find less populated places doing a forward jump of * REDIS_VM_MAX_RANDOM_JUMP, then we start scanning again a few pages * without hurry, and then we jump again and so forth... - * + * * This function can be improved using a free list to avoid to guess * too much, since we could collect data about freed pages. * @@ -8231,30 +9590,33 @@ static int vmWriteObjectOnSwap(robj *o, off_t page) { return REDIS_OK; } -/* Swap the 'val' object relative to 'key' into disk. Store all the information - * needed to later retrieve the object into the key object. +/* Transfers the 'val' object to disk. Store all the information + * a 'vmpointer' object containing all the information needed to load the + * object back later is returned. + * * If we can't find enough contiguous empty pages to swap the object on disk - * REDIS_ERR is returned. */ -static int vmSwapObjectBlocking(robj *key, robj *val) { + * NULL is returned. */ +static vmpointer *vmSwapObjectBlocking(robj *val) { off_t pages = rdbSavedObjectPages(val,NULL); off_t page; + vmpointer *vp; - assert(key->storage == REDIS_VM_MEMORY); - assert(key->refcount == 1); - if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR; - if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return REDIS_ERR; - key->vm.page = page; - key->vm.usedpages = pages; - key->storage = REDIS_VM_SWAPPED; - key->vtype = val->type; + assert(val->storage == REDIS_VM_MEMORY); + assert(val->refcount == 1); + if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return NULL; + if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return NULL; + + vp = createVmPointer(val->type); + vp->page = page; + vp->usedpages = pages; decrRefCount(val); /* Deallocate the object from memory. */ vmMarkPagesUsed(page,pages); - redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)", - (unsigned char*) key->ptr, + redisLog(REDIS_DEBUG,"VM: object %p swapped out at %lld (%lld pages)", + (void*) val, (unsigned long long) page, (unsigned long long) pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; - return REDIS_OK; + return vp; } static robj *vmReadObjectFromSwap(off_t page, int type) { @@ -8276,46 +9638,47 @@ static robj *vmReadObjectFromSwap(off_t page, int type) { return o; } -/* Load the value object relative to the 'key' object from swap to memory. +/* Load the specified object from swap to memory. * The newly allocated object is returned. * * If preview is true the unserialized object is returned to the caller but - * no changes are made to the key object, nor the pages are marked as freed */ -static robj *vmGenericLoadObject(robj *key, int preview) { + * the pages are not marked as freed, nor the vp object is freed. */ +static robj *vmGenericLoadObject(vmpointer *vp, int preview) { robj *val; - redisAssert(key->storage == REDIS_VM_SWAPPED || key->storage == REDIS_VM_LOADING); - val = vmReadObjectFromSwap(key->vm.page,key->vtype); + redisAssert(vp->type == REDIS_VMPOINTER && + (vp->storage == REDIS_VM_SWAPPED || vp->storage == REDIS_VM_LOADING)); + val = vmReadObjectFromSwap(vp->page,vp->vtype); if (!preview) { - key->storage = REDIS_VM_MEMORY; - key->vm.atime = server.unixtime; - vmMarkPagesFree(key->vm.page,key->vm.usedpages); - redisLog(REDIS_DEBUG, "VM: object %s loaded from disk", - (unsigned char*) key->ptr); + redisLog(REDIS_DEBUG, "VM: object %p loaded from disk", (void*)vp); + vmMarkPagesFree(vp->page,vp->usedpages); + zfree(vp); server.vm_stats_swapped_objects--; } else { - redisLog(REDIS_DEBUG, "VM: object %s previewed from disk", - (unsigned char*) key->ptr); + redisLog(REDIS_DEBUG, "VM: object %p previewed from disk", (void*)vp); } server.vm_stats_swapins++; return val; } -/* Plain object loading, from swap to memory */ -static robj *vmLoadObject(robj *key) { +/* Plain object loading, from swap to memory. + * + * 'o' is actually a redisVmPointer structure that will be freed by the call. + * The return value is the loaded object. */ +static robj *vmLoadObject(robj *o) { /* If we are loading the object in background, stop it, we * need to load this object synchronously ASAP. */ - if (key->storage == REDIS_VM_LOADING) - vmCancelThreadedIOJob(key); - return vmGenericLoadObject(key,0); + if (o->storage == REDIS_VM_LOADING) + vmCancelThreadedIOJob(o); + return vmGenericLoadObject((vmpointer*)o,0); } /* Just load the value on disk, without to modify the key. * This is useful when we want to perform some operation on the value * without to really bring it from swap to memory, like while saving the * dataset or rewriting the append only log. */ -static robj *vmPreviewObject(robj *key) { - return vmGenericLoadObject(key,1); +static robj *vmPreviewObject(robj *o) { + return vmGenericLoadObject((vmpointer*)o,1); } /* How a good candidate is this object for swapping? @@ -8330,14 +9693,18 @@ static robj *vmPreviewObject(robj *key) { * proportionally, this is why we use the logarithm. This algorithm is * just a first try and will probably be tuned later. */ static double computeObjectSwappability(robj *o) { - time_t age = server.unixtime - o->vm.atime; - long asize = 0; + /* actual age can be >= minage, but not < minage. As we use wrapping + * 21 bit clocks with minutes resolution for the LRU. */ + time_t minage = abs(server.lruclock - o->lru); + long asize = 0, elesize; + robj *ele; list *l; + listNode *ln; dict *d; struct dictEntry *de; int z; - if (age <= 0) return 0; + if (minage <= 0) return 0; switch(o->type) { case REDIS_STRING: if (o->encoding != REDIS_ENCODING_RAW) { @@ -8347,18 +9714,18 @@ static double computeObjectSwappability(robj *o) { } break; case REDIS_LIST: - l = o->ptr; - listNode *ln = listFirst(l); - - asize = sizeof(list); - if (ln) { - robj *ele = ln->value; - long elesize; - - elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); - asize += (sizeof(listNode)+elesize)*listLength(l); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + asize = sizeof(*o)+ziplistSize(o->ptr); + } else { + l = o->ptr; + ln = listFirst(l); + asize = sizeof(list); + if (ln) { + ele = ln->value; + elesize = (ele->encoding == REDIS_ENCODING_RAW) ? + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); + asize += (sizeof(listNode)+elesize)*listLength(l); + } } break; case REDIS_SET: @@ -8369,14 +9736,10 @@ static double computeObjectSwappability(robj *o) { asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d)); if (z) asize += sizeof(zset)-sizeof(dict); if (dictSize(d)) { - long elesize; - robj *ele; - de = dictGetRandomKey(d); ele = dictGetEntryKey(de); elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); asize += (sizeof(struct dictEntry)+elesize)*dictSize(d); if (z) asize += sizeof(zskiplistNode)*dictSize(d); } @@ -8397,24 +9760,19 @@ static double computeObjectSwappability(robj *o) { d = o->ptr; asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d)); if (dictSize(d)) { - long elesize; - robj *ele; - de = dictGetRandomKey(d); ele = dictGetEntryKey(de); elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); ele = dictGetEntryVal(de); elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); asize += (sizeof(struct dictEntry)+elesize)*dictSize(d); } } break; } - return (double)age*log(1+asize); + return (double)minage*log(1+asize); } /* Try to swap an object that's a good candidate for swapping. @@ -8428,7 +9786,8 @@ static int vmSwapOneObject(int usethreads) { struct dictEntry *best = NULL; double best_swappability = 0; redisDb *best_db = NULL; - robj *key, *val; + robj *val; + sds key; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -8444,16 +9803,14 @@ static int vmSwapOneObject(int usethreads) { if (maxtries) maxtries--; de = dictGetRandomKey(db->dict); - key = dictGetEntryKey(de); val = dictGetEntryVal(de); /* Only swap objects that are currently in memory. * - * Also don't swap shared objects if threaded VM is on, as we - * try to ensure that the main thread does not touch the + * Also don't swap shared objects: not a good idea in general and + * we need to ensure that the main thread does not touch the * object while the I/O thread is using it, but we can't * control other keys without adding additional mutex. */ - if (key->storage != REDIS_VM_MEMORY || - (server.vm_max_threads != 0 && val->refcount != 1)) { + if (val->storage != REDIS_VM_MEMORY || val->refcount != 1) { if (maxtries) i--; /* don't count this try */ continue; } @@ -8470,21 +9827,19 @@ static int vmSwapOneObject(int usethreads) { val = dictGetEntryVal(best); redisLog(REDIS_DEBUG,"Key with best swappability: %s, %f", - key->ptr, best_swappability); + key, best_swappability); - /* Unshare the key if needed */ - if (key->refcount > 1) { - robj *newkey = dupStringObject(key); - decrRefCount(key); - key = dictGetEntryKey(best) = newkey; - } /* Swap it */ if (usethreads) { - vmSwapObjectThreaded(key,val,best_db); + robj *keyobj = createStringObject(key,sdslen(key)); + vmSwapObjectThreaded(keyobj,val,best_db); + decrRefCount(keyobj); return REDIS_OK; } else { - if (vmSwapObjectBlocking(key,val) == REDIS_OK) { - dictGetEntryVal(best) = NULL; + vmpointer *vp; + + if ((vp = vmSwapObjectBlocking(val)) != NULL) { + dictGetEntryVal(best) = vp; return REDIS_OK; } else { return REDIS_ERR; @@ -8507,26 +9862,19 @@ static int vmCanSwapOut(void) { return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1); } -/* Delete a key if swapped. Returns 1 if the key was found, was swapped - * and was deleted. Otherwise 0 is returned. */ -static int deleteIfSwapped(redisDb *db, robj *key) { - dictEntry *de; - robj *foundkey; - - if ((de = dictFind(db->dict,key)) == NULL) return 0; - foundkey = dictGetEntryKey(de); - if (foundkey->storage == REDIS_VM_MEMORY) return 0; - deleteKey(db,key); - return 1; -} - /* =================== Virtual Memory - Threaded I/O ======================= */ static void freeIOJob(iojob *j) { if ((j->type == REDIS_IOJOB_PREPARE_SWAP || j->type == REDIS_IOJOB_DO_SWAP || j->type == REDIS_IOJOB_LOAD) && j->val != NULL) + { + /* we fix the storage type, otherwise decrRefCount() will try to + * kill the I/O thread Job (that does no longer exists). */ + if (j->val->storage == REDIS_VM_SWAPPING) + j->val->storage = REDIS_VM_MEMORY; decrRefCount(j->val); + } decrRefCount(j->key); zfree(j); } @@ -8548,7 +9896,6 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, while((retval = read(fd,buf,1)) == 1) { iojob *j; listNode *ln; - robj *key; struct dictEntry *de; redisLog(REDIS_DEBUG,"Processing I/O completed job"); @@ -8571,27 +9918,26 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, } /* Post process it in the main thread, as there are things we * can do just here to avoid race conditions and/or invasive locks */ - redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount); - de = dictFind(j->db->dict,j->key); - assert(de != NULL); - key = dictGetEntryKey(de); + redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr); + de = dictFind(j->db->dict,j->key->ptr); + redisAssert(de != NULL); if (j->type == REDIS_IOJOB_LOAD) { redisDb *db; + vmpointer *vp = dictGetEntryVal(de); /* Key loaded, bring it at home */ - key->storage = REDIS_VM_MEMORY; - key->vm.atime = server.unixtime; - vmMarkPagesFree(key->vm.page,key->vm.usedpages); + vmMarkPagesFree(vp->page,vp->usedpages); redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)", - (unsigned char*) key->ptr); + (unsigned char*) j->key->ptr); server.vm_stats_swapped_objects--; server.vm_stats_swapins++; dictGetEntryVal(de) = j->val; incrRefCount(j->val); db = j->db; - freeIOJob(j); /* Handle clients waiting for this key to be loaded. */ - handleClientsBlockedOnSwappedKey(db,key); + handleClientsBlockedOnSwappedKey(db,j->key); + freeIOJob(j); + zfree(vp); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { /* Now we know the amount of pages required to swap this object. * Let's find some space for it, and queue this task again @@ -8601,8 +9947,8 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, { /* Ooops... no space or we can't swap as there is * a fork()ed Redis trying to save stuff on disk. */ + j->val->storage = REDIS_VM_MEMORY; /* undo operation */ freeIOJob(j); - key->storage = REDIS_VM_MEMORY; /* undo operation */ } else { /* Note that we need to mark this pages as used now, * if the job will be canceled, we'll mark them as freed @@ -8614,28 +9960,29 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, unlockThreadedIO(); } } else if (j->type == REDIS_IOJOB_DO_SWAP) { - robj *val; + vmpointer *vp; /* Key swapped. We can finally free some memory. */ - if (key->storage != REDIS_VM_SWAPPING) { - printf("key->storage: %d\n",key->storage); - printf("key->name: %s\n",(char*)key->ptr); - printf("key->refcount: %d\n",key->refcount); + if (j->val->storage != REDIS_VM_SWAPPING) { + vmpointer *vp = (vmpointer*) j->id; + printf("storage: %d\n",vp->storage); + printf("key->name: %s\n",(char*)j->key->ptr); printf("val: %p\n",(void*)j->val); printf("val->type: %d\n",j->val->type); printf("val->ptr: %s\n",(char*)j->val->ptr); } - redisAssert(key->storage == REDIS_VM_SWAPPING); - val = dictGetEntryVal(de); - key->vm.page = j->page; - key->vm.usedpages = j->pages; - key->storage = REDIS_VM_SWAPPED; - key->vtype = j->val->type; - decrRefCount(val); /* Deallocate the object from memory. */ - dictGetEntryVal(de) = NULL; + redisAssert(j->val->storage == REDIS_VM_SWAPPING); + vp = createVmPointer(j->val->type); + vp->page = j->page; + vp->usedpages = j->pages; + dictGetEntryVal(de) = vp; + /* Fix the storage otherwise decrRefCount will attempt to + * remove the associated I/O job */ + j->val->storage = REDIS_VM_MEMORY; + decrRefCount(j->val); redisLog(REDIS_DEBUG, "VM: object %s swapped out at %lld (%lld pages) (threaded)", - (unsigned char*) key->ptr, + (unsigned char*) j->key->ptr, (unsigned long long) j->page, (unsigned long long) j->pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; @@ -8690,7 +10037,7 @@ static void vmCancelThreadedIOJob(robj *o) { assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING); again: lockThreadedIO(); - /* Search for a matching key in one of the queues */ + /* Search for a matching object in one of the queues */ for (i = 0; i < 3; i++) { listNode *ln; listIter li; @@ -8700,9 +10047,9 @@ again: iojob *job = ln->value; if (job->canceled) continue; /* Skip this, already canceled. */ - if (compareStringObjects(job->key,o) == 0) { - redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n", - (void*)job, (char*)o->ptr, job->type, i); + if (job->id == o) { + redisLog(REDIS_DEBUG,"*** CANCELED %p (key %s) (type %d) (LIST ID %d)\n", + (void*)job, (char*)job->key->ptr, job->type, i); /* Mark the pages as free since the swap didn't happened * or happened but is now discarded. */ if (i != 1 && job->type == REDIS_IOJOB_DO_SWAP) @@ -8750,12 +10097,14 @@ again: else if (o->storage == REDIS_VM_SWAPPING) o->storage = REDIS_VM_MEMORY; unlockThreadedIO(); + redisLog(REDIS_DEBUG,"*** DONE"); return; } } } unlockThreadedIO(); - assert(1 != 1); /* We should never reach this */ + printf("Not found: %p\n", (void*)o); + redisAssert(1 != 1); /* We should never reach this */ } static void *IOThreadEntryPoint(void *arg) { @@ -8788,7 +10137,8 @@ static void *IOThreadEntryPoint(void *arg) { /* Process the Job */ if (j->type == REDIS_IOJOB_LOAD) { - j->val = vmReadObjectFromSwap(j->page,j->key->vtype); + vmpointer *vp = (vmpointer*)j->id; + j->val = vmReadObjectFromSwap(j->page,vp->vtype); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { FILE *fp = fopen("/dev/null","w+"); j->pages = rdbSavedObjectPages(j->val,fp); @@ -8805,7 +10155,7 @@ static void *IOThreadEntryPoint(void *arg) { listDelNode(server.io_processing,ln); listAddNodeTail(server.io_processed,j); unlockThreadedIO(); - + /* Signal the main thread there is new stuff to process */ assert(write(server.io_ready_pipe_write,"x",1) == 1); } @@ -8883,19 +10233,17 @@ static void queueIOJob(iojob *j) { static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) { iojob *j; - - assert(key->storage == REDIS_VM_MEMORY); - assert(key->refcount == 1); j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_PREPARE_SWAP; j->db = db; - j->key = dupStringObject(key); - j->val = val; + j->key = key; + incrRefCount(key); + j->id = j->val = val; incrRefCount(val); j->canceled = 0; j->thread = (pthread_t) -1; - key->storage = REDIS_VM_SWAPPING; + val->storage = REDIS_VM_SWAPPING; lockThreadedIO(); queueIOJob(j); @@ -8917,9 +10265,9 @@ static int waitForSwappedKey(redisClient *c, robj *key) { /* If the key does not exist or is already in RAM we don't need to * block the client at all. */ - de = dictFind(c->db->dict,key); + de = dictFind(c->db->dict,key->ptr); if (de == NULL) return 0; - o = dictGetEntryKey(de); + o = dictGetEntryVal(de); if (o->storage == REDIS_VM_MEMORY) { return 0; } else if (o->storage == REDIS_VM_SWAPPING) { @@ -8927,7 +10275,7 @@ static int waitForSwappedKey(redisClient *c, robj *key) { vmCancelThreadedIOJob(o); return 0; } - + /* OK: the key is either swapped, or being loaded just now. */ /* Add the key to the list of keys this client is waiting for. @@ -8953,14 +10301,16 @@ static int waitForSwappedKey(redisClient *c, robj *key) { /* Are we already loading the key from disk? If not create a job */ if (o->storage == REDIS_VM_SWAPPED) { iojob *j; + vmpointer *vp = (vmpointer*)o; o->storage = REDIS_VM_LOADING; j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_LOAD; j->db = c->db; - j->key = dupStringObject(key); - j->key->vtype = o->vtype; - j->page = o->vm.page; + j->id = (robj*)vp; + j->key = key; + incrRefCount(key); + j->page = vp->page; j->val = NULL; j->canceled = 0; j->thread = (pthread_t) -1; @@ -8971,12 +10321,56 @@ static int waitForSwappedKey(redisClient *c, robj *key) { return 1; } -/* Preload keys needed for the ZUNION and ZINTER commands. */ -static void zunionInterBlockClientOnSwappedKeys(redisClient *c) { +/* Preload keys for any command with first, last and step values for + * the command keys prototype, as defined in the command table. */ +static void waitForMultipleSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { + int j, last; + if (cmd->vm_firstkey == 0) return; + last = cmd->vm_lastkey; + if (last < 0) last = argc+last; + for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) { + redisAssert(j < argc); + waitForSwappedKey(c,argv[j]); + } +} + +/* Preload keys needed for the ZUNIONSTORE and ZINTERSTORE commands. + * Note that the number of keys to preload is user-defined, so we need to + * apply a sanity check against argc. */ +static void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { int i, num; - num = atoi(c->argv[2]->ptr); + REDIS_NOTUSED(cmd); + + num = atoi(argv[2]->ptr); + if (num > (argc-3)) return; for (i = 0; i < num; i++) { - waitForSwappedKey(c,c->argv[3+i]); + waitForSwappedKey(c,argv[3+i]); + } +} + +/* Preload keys needed to execute the entire MULTI/EXEC block. + * + * This function is called by blockClientOnSwappedKeys when EXEC is issued, + * and will block the client when any command requires a swapped out value. */ +static void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { + int i, margc; + struct redisCommand *mcmd; + robj **margv; + REDIS_NOTUSED(cmd); + REDIS_NOTUSED(argc); + REDIS_NOTUSED(argv); + + if (!(c->flags & REDIS_MULTI)) return; + for (i = 0; i < c->mstate.count; i++) { + mcmd = c->mstate.commands[i].cmd; + margc = c->mstate.commands[i].argc; + margv = c->mstate.commands[i].argv; + + if (mcmd->vm_preload_proc != NULL) { + mcmd->vm_preload_proc(c,mcmd,margc,margv); + } else { + waitForMultipleSwappedKeys(c,mcmd,margc,margv); + } } } @@ -8990,17 +10384,11 @@ static void zunionInterBlockClientOnSwappedKeys(redisClient *c) { * * Return 1 if the client is marked as blocked, 0 if the client can * continue as the keys it is going to access appear to be in memory. */ -static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c) { - int j, last; - +static int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) { if (cmd->vm_preload_proc != NULL) { - cmd->vm_preload_proc(c); + cmd->vm_preload_proc(c,cmd,c->argc,c->argv); } else { - if (cmd->vm_firstkey == 0) return 0; - last = cmd->vm_lastkey; - if (last < 0) last = c->argc+last; - for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) - waitForSwappedKey(c,c->argv[j]); + waitForMultipleSwappedKeys(c,cmd,c->argc,c->argv); } /* If the client was blocked for at least one key, mark it as blocked. */ @@ -9027,7 +10415,7 @@ static int dontWaitForSwappedKey(redisClient *c, robj *key) { /* Remove the key from the list of keys this client is waiting for. */ listRewind(c->io_keys,&li); while ((ln = listNext(&li)) != NULL) { - if (compareStringObjects(ln->value,key) == 0) { + if (equalStringObjects(ln->value,key)) { listDelNode(c->io_keys,ln); break; } @@ -9047,6 +10435,8 @@ static int dontWaitForSwappedKey(redisClient *c, robj *key) { return listLength(c->io_keys) == 0; } +/* Every time we now a key was loaded back in memory, we handle clients + * waiting for this key if any. */ static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) { struct dictEntry *de; list *l; @@ -9076,6 +10466,8 @@ static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) { static void configSetCommand(redisClient *c) { robj *o = getDecodedObject(c->argv[3]); + long long ll; + if (!strcasecmp(c->argv[2]->ptr,"dbfilename")) { zfree(server.dbfilename); server.dbfilename = zstrdup(o->ptr); @@ -9086,7 +10478,79 @@ static void configSetCommand(redisClient *c) { zfree(server.masterauth); server.masterauth = zstrdup(o->ptr); } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory")) { - server.maxmemory = strtoll(o->ptr, NULL, 10); + if (getLongLongFromObject(o,&ll) == REDIS_ERR || + ll < 0) goto badfmt; + server.maxmemory = ll; + } else if (!strcasecmp(c->argv[2]->ptr,"timeout")) { + if (getLongLongFromObject(o,&ll) == REDIS_ERR || + ll < 0 || ll > LONG_MAX) goto badfmt; + server.maxidletime = ll; + } else if (!strcasecmp(c->argv[2]->ptr,"appendfsync")) { + if (!strcasecmp(o->ptr,"no")) { + server.appendfsync = APPENDFSYNC_NO; + } else if (!strcasecmp(o->ptr,"everysec")) { + server.appendfsync = APPENDFSYNC_EVERYSEC; + } else if (!strcasecmp(o->ptr,"always")) { + server.appendfsync = APPENDFSYNC_ALWAYS; + } else { + goto badfmt; + } + } else if (!strcasecmp(c->argv[2]->ptr,"no-appendfsync-on-rewrite")) { + int yn = yesnotoi(o->ptr); + + if (yn == -1) goto badfmt; + server.no_appendfsync_on_rewrite = yn; + } else if (!strcasecmp(c->argv[2]->ptr,"appendonly")) { + int old = server.appendonly; + int new = yesnotoi(o->ptr); + + if (new == -1) goto badfmt; + if (old != new) { + if (new == 0) { + stopAppendOnly(); + } else { + if (startAppendOnly() == REDIS_ERR) { + addReplySds(c,sdscatprintf(sdsempty(), + "-ERR Unable to turn on AOF. Check server logs.\r\n")); + decrRefCount(o); + return; + } + } + } + } else if (!strcasecmp(c->argv[2]->ptr,"save")) { + int vlen, j; + sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen); + + /* Perform sanity check before setting the new config: + * - Even number of args + * - Seconds >= 1, changes >= 0 */ + if (vlen & 1) { + sdsfreesplitres(v,vlen); + goto badfmt; + } + for (j = 0; j < vlen; j++) { + char *eptr; + long val; + + val = strtoll(v[j], &eptr, 10); + if (eptr[0] != '\0' || + ((j & 1) == 0 && val < 1) || + ((j & 1) == 1 && val < 0)) { + sdsfreesplitres(v,vlen); + goto badfmt; + } + } + /* Finally set the new config */ + resetServerSaveParams(); + for (j = 0; j < vlen; j += 2) { + time_t seconds; + int changes; + + seconds = strtoll(v[j],NULL,10); + changes = strtoll(v[j+1],NULL,10); + appendServerSaveParams(seconds, changes); + } + sdsfreesplitres(v,vlen); } else { addReplySds(c,sdscatprintf(sdsempty(), "-ERR not supported CONFIG parameter %s\r\n", @@ -9096,6 +10560,14 @@ static void configSetCommand(redisClient *c) { } decrRefCount(o); addReply(c,shared.ok); + return; + +badfmt: /* Bad format errors */ + addReplySds(c,sdscatprintf(sdsempty(), + "-ERR invalid argument '%s' for CONFIG SET '%s'\r\n", + (char*)o->ptr, + (char*)c->argv[2]->ptr)); + decrRefCount(o); } static void configGetCommand(redisClient *c) { @@ -9125,11 +10597,58 @@ static void configGetCommand(redisClient *c) { if (stringmatch(pattern,"maxmemory",0)) { char buf[128]; - snprintf(buf,128,"%llu\n",server.maxmemory); + ll2string(buf,128,server.maxmemory); addReplyBulkCString(c,"maxmemory"); addReplyBulkCString(c,buf); matches++; } + if (stringmatch(pattern,"timeout",0)) { + char buf[128]; + + ll2string(buf,128,server.maxidletime); + addReplyBulkCString(c,"timeout"); + addReplyBulkCString(c,buf); + matches++; + } + if (stringmatch(pattern,"appendonly",0)) { + addReplyBulkCString(c,"appendonly"); + addReplyBulkCString(c,server.appendonly ? "yes" : "no"); + matches++; + } + if (stringmatch(pattern,"no-appendfsync-on-rewrite",0)) { + addReplyBulkCString(c,"no-appendfsync-on-rewrite"); + addReplyBulkCString(c,server.no_appendfsync_on_rewrite ? "yes" : "no"); + matches++; + } + if (stringmatch(pattern,"appendfsync",0)) { + char *policy; + + switch(server.appendfsync) { + case APPENDFSYNC_NO: policy = "no"; break; + case APPENDFSYNC_EVERYSEC: policy = "everysec"; break; + case APPENDFSYNC_ALWAYS: policy = "always"; break; + default: policy = "unknown"; break; /* too harmless to panic */ + } + addReplyBulkCString(c,"appendfsync"); + addReplyBulkCString(c,policy); + matches++; + } + if (stringmatch(pattern,"save",0)) { + sds buf = sdsempty(); + int j; + + for (j = 0; j < server.saveparamslen; j++) { + buf = sdscatprintf(buf,"%ld %d", + server.saveparams[j].seconds, + server.saveparams[j].changes); + if (j != server.saveparamslen-1) + buf = sdscatlen(buf," ",1); + } + addReplyBulkCString(c,"save"); + addReplyBulkCString(c,buf); + sdsfree(buf); + matches++; + } decrRefCount(o); lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2); } @@ -9160,8 +10679,581 @@ badarity: (char*) c->argv[1]->ptr)); } +/* =========================== Pubsub implementation ======================== */ + +static void freePubsubPattern(void *p) { + pubsubPattern *pat = p; + + decrRefCount(pat->pattern); + zfree(pat); +} + +static int listMatchPubsubPattern(void *a, void *b) { + pubsubPattern *pa = a, *pb = b; + + return (pa->client == pb->client) && + (equalStringObjects(pa->pattern,pb->pattern)); +} + +/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or + * 0 if the client was already subscribed to that channel. */ +static int pubsubSubscribeChannel(redisClient *c, robj *channel) { + struct dictEntry *de; + list *clients = NULL; + int retval = 0; + + /* Add the channel to the client -> channels hash table */ + if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { + retval = 1; + incrRefCount(channel); + /* Add the client to the channel -> list of clients hash table */ + de = dictFind(server.pubsub_channels,channel); + if (de == NULL) { + clients = listCreate(); + dictAdd(server.pubsub_channels,channel,clients); + incrRefCount(channel); + } else { + clients = dictGetEntryVal(de); + } + listAddNodeTail(clients,c); + } + /* Notify the client */ + addReply(c,shared.mbulk3); + addReply(c,shared.subscribebulk); + addReplyBulk(c,channel); + addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); + return retval; +} + +/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or + * 0 if the client was not subscribed to the specified channel. */ +static int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { + struct dictEntry *de; + list *clients; + listNode *ln; + int retval = 0; + + /* Remove the channel from the client -> channels hash table */ + incrRefCount(channel); /* channel may be just a pointer to the same object + we have in the hash tables. Protect it... */ + if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { + retval = 1; + /* Remove the client from the channel -> clients list hash table */ + de = dictFind(server.pubsub_channels,channel); + assert(de != NULL); + clients = dictGetEntryVal(de); + ln = listSearchKey(clients,c); + assert(ln != NULL); + listDelNode(clients,ln); + if (listLength(clients) == 0) { + /* Free the list and associated hash entry at all if this was + * the latest client, so that it will be possible to abuse + * Redis PUBSUB creating millions of channels. */ + dictDelete(server.pubsub_channels,channel); + } + } + /* Notify the client */ + if (notify) { + addReply(c,shared.mbulk3); + addReply(c,shared.unsubscribebulk); + addReplyBulk(c,channel); + addReplyLongLong(c,dictSize(c->pubsub_channels)+ + listLength(c->pubsub_patterns)); + + } + decrRefCount(channel); /* it is finally safe to release it */ + return retval; +} + +/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */ +static int pubsubSubscribePattern(redisClient *c, robj *pattern) { + int retval = 0; + + if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { + retval = 1; + pubsubPattern *pat; + listAddNodeTail(c->pubsub_patterns,pattern); + incrRefCount(pattern); + pat = zmalloc(sizeof(*pat)); + pat->pattern = getDecodedObject(pattern); + pat->client = c; + listAddNodeTail(server.pubsub_patterns,pat); + } + /* Notify the client */ + addReply(c,shared.mbulk3); + addReply(c,shared.psubscribebulk); + addReplyBulk(c,pattern); + addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); + return retval; +} + +/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or + * 0 if the client was not subscribed to the specified channel. */ +static int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { + listNode *ln; + pubsubPattern pat; + int retval = 0; + + incrRefCount(pattern); /* Protect the object. May be the same we remove */ + if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { + retval = 1; + listDelNode(c->pubsub_patterns,ln); + pat.client = c; + pat.pattern = pattern; + ln = listSearchKey(server.pubsub_patterns,&pat); + listDelNode(server.pubsub_patterns,ln); + } + /* Notify the client */ + if (notify) { + addReply(c,shared.mbulk3); + addReply(c,shared.punsubscribebulk); + addReplyBulk(c,pattern); + addReplyLongLong(c,dictSize(c->pubsub_channels)+ + listLength(c->pubsub_patterns)); + } + decrRefCount(pattern); + return retval; +} + +/* Unsubscribe from all the channels. Return the number of channels the + * client was subscribed from. */ +static int pubsubUnsubscribeAllChannels(redisClient *c, int notify) { + dictIterator *di = dictGetIterator(c->pubsub_channels); + dictEntry *de; + int count = 0; + + while((de = dictNext(di)) != NULL) { + robj *channel = dictGetEntryKey(de); + + count += pubsubUnsubscribeChannel(c,channel,notify); + } + dictReleaseIterator(di); + return count; +} + +/* Unsubscribe from all the patterns. Return the number of patterns the + * client was subscribed from. */ +static int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) { + listNode *ln; + listIter li; + int count = 0; + + listRewind(c->pubsub_patterns,&li); + while ((ln = listNext(&li)) != NULL) { + robj *pattern = ln->value; + + count += pubsubUnsubscribePattern(c,pattern,notify); + } + return count; +} + +/* Publish a message */ +static int pubsubPublishMessage(robj *channel, robj *message) { + int receivers = 0; + struct dictEntry *de; + listNode *ln; + listIter li; + + /* Send to clients listening for that channel */ + de = dictFind(server.pubsub_channels,channel); + if (de) { + list *list = dictGetEntryVal(de); + listNode *ln; + listIter li; + + listRewind(list,&li); + while ((ln = listNext(&li)) != NULL) { + redisClient *c = ln->value; + + addReply(c,shared.mbulk3); + addReply(c,shared.messagebulk); + addReplyBulk(c,channel); + addReplyBulk(c,message); + receivers++; + } + } + /* Send to clients listening to matching channels */ + if (listLength(server.pubsub_patterns)) { + listRewind(server.pubsub_patterns,&li); + channel = getDecodedObject(channel); + while ((ln = listNext(&li)) != NULL) { + pubsubPattern *pat = ln->value; + + if (stringmatchlen((char*)pat->pattern->ptr, + sdslen(pat->pattern->ptr), + (char*)channel->ptr, + sdslen(channel->ptr),0)) { + addReply(pat->client,shared.mbulk4); + addReply(pat->client,shared.pmessagebulk); + addReplyBulk(pat->client,pat->pattern); + addReplyBulk(pat->client,channel); + addReplyBulk(pat->client,message); + receivers++; + } + } + decrRefCount(channel); + } + return receivers; +} + +static void subscribeCommand(redisClient *c) { + int j; + + for (j = 1; j < c->argc; j++) + pubsubSubscribeChannel(c,c->argv[j]); +} + +static void unsubscribeCommand(redisClient *c) { + if (c->argc == 1) { + pubsubUnsubscribeAllChannels(c,1); + return; + } else { + int j; + + for (j = 1; j < c->argc; j++) + pubsubUnsubscribeChannel(c,c->argv[j],1); + } +} + +static void psubscribeCommand(redisClient *c) { + int j; + + for (j = 1; j < c->argc; j++) + pubsubSubscribePattern(c,c->argv[j]); +} + +static void punsubscribeCommand(redisClient *c) { + if (c->argc == 1) { + pubsubUnsubscribeAllPatterns(c,1); + return; + } else { + int j; + + for (j = 1; j < c->argc; j++) + pubsubUnsubscribePattern(c,c->argv[j],1); + } +} + +static void publishCommand(redisClient *c) { + int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); + addReplyLongLong(c,receivers); +} + +/* ===================== WATCH (CAS alike for MULTI/EXEC) =================== + * + * The implementation uses a per-DB hash table mapping keys to list of clients + * WATCHing those keys, so that given a key that is going to be modified + * we can mark all the associated clients as dirty. + * + * Also every client contains a list of WATCHed keys so that's possible to + * un-watch such keys when the client is freed or when UNWATCH is called. */ + +/* In the client->watched_keys list we need to use watchedKey structures + * as in order to identify a key in Redis we need both the key name and the + * DB */ +typedef struct watchedKey { + robj *key; + redisDb *db; +} watchedKey; + +/* Watch for the specified key */ +static void watchForKey(redisClient *c, robj *key) { + list *clients = NULL; + listIter li; + listNode *ln; + watchedKey *wk; + + /* Check if we are already watching for this key */ + listRewind(c->watched_keys,&li); + while((ln = listNext(&li))) { + wk = listNodeValue(ln); + if (wk->db == c->db && equalStringObjects(key,wk->key)) + return; /* Key already watched */ + } + /* This key is not already watched in this DB. Let's add it */ + clients = dictFetchValue(c->db->watched_keys,key); + if (!clients) { + clients = listCreate(); + dictAdd(c->db->watched_keys,key,clients); + incrRefCount(key); + } + listAddNodeTail(clients,c); + /* Add the new key to the lits of keys watched by this client */ + wk = zmalloc(sizeof(*wk)); + wk->key = key; + wk->db = c->db; + incrRefCount(key); + listAddNodeTail(c->watched_keys,wk); +} + +/* Unwatch all the keys watched by this client. To clean the EXEC dirty + * flag is up to the caller. */ +static void unwatchAllKeys(redisClient *c) { + listIter li; + listNode *ln; + + if (listLength(c->watched_keys) == 0) return; + listRewind(c->watched_keys,&li); + while((ln = listNext(&li))) { + list *clients; + watchedKey *wk; + + /* Lookup the watched key -> clients list and remove the client + * from the list */ + wk = listNodeValue(ln); + clients = dictFetchValue(wk->db->watched_keys, wk->key); + assert(clients != NULL); + listDelNode(clients,listSearchKey(clients,c)); + /* Kill the entry at all if this was the only client */ + if (listLength(clients) == 0) + dictDelete(wk->db->watched_keys, wk->key); + /* Remove this watched key from the client->watched list */ + listDelNode(c->watched_keys,ln); + decrRefCount(wk->key); + zfree(wk); + } +} + +/* "Touch" a key, so that if this key is being WATCHed by some client the + * next EXEC will fail. */ +static void touchWatchedKey(redisDb *db, robj *key) { + list *clients; + listIter li; + listNode *ln; + + if (dictSize(db->watched_keys) == 0) return; + clients = dictFetchValue(db->watched_keys, key); + if (!clients) return; + + /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ + /* Check if we are already watching for this key */ + listRewind(clients,&li); + while((ln = listNext(&li))) { + redisClient *c = listNodeValue(ln); + + c->flags |= REDIS_DIRTY_CAS; + } +} + +/* On FLUSHDB or FLUSHALL all the watched keys that are present before the + * flush but will be deleted as effect of the flushing operation should + * be touched. "dbid" is the DB that's getting the flush. -1 if it is + * a FLUSHALL operation (all the DBs flushed). */ +static void touchWatchedKeysOnFlush(int dbid) { + listIter li1, li2; + listNode *ln; + + /* For every client, check all the waited keys */ + listRewind(server.clients,&li1); + while((ln = listNext(&li1))) { + redisClient *c = listNodeValue(ln); + listRewind(c->watched_keys,&li2); + while((ln = listNext(&li2))) { + watchedKey *wk = listNodeValue(ln); + + /* For every watched key matching the specified DB, if the + * key exists, mark the client as dirty, as the key will be + * removed. */ + if (dbid == -1 || wk->db->id == dbid) { + if (dictFind(wk->db->dict, wk->key->ptr) != NULL) + c->flags |= REDIS_DIRTY_CAS; + } + } + } +} + +static void watchCommand(redisClient *c) { + int j; + + if (c->flags & REDIS_MULTI) { + addReplySds(c,sdsnew("-ERR WATCH inside MULTI is not allowed\r\n")); + return; + } + for (j = 1; j < c->argc; j++) + watchForKey(c,c->argv[j]); + addReply(c,shared.ok); +} + +static void unwatchCommand(redisClient *c) { + unwatchAllKeys(c); + c->flags &= (~REDIS_DIRTY_CAS); + addReply(c,shared.ok); +} + /* ================================= Debugging ============================== */ +/* Compute the sha1 of string at 's' with 'len' bytes long. + * The SHA1 is then xored againt the string pointed by digest. + * Since xor is commutative, this operation is used in order to + * "add" digests relative to unordered elements. + * + * So digest(a,b,c,d) will be the same of digest(b,a,c,d) */ +static void xorDigest(unsigned char *digest, void *ptr, size_t len) { + SHA1_CTX ctx; + unsigned char hash[20], *s = ptr; + int j; + + SHA1Init(&ctx); + SHA1Update(&ctx,s,len); + SHA1Final(hash,&ctx); + + for (j = 0; j < 20; j++) + digest[j] ^= hash[j]; +} + +static void xorObjectDigest(unsigned char *digest, robj *o) { + o = getDecodedObject(o); + xorDigest(digest,o->ptr,sdslen(o->ptr)); + decrRefCount(o); +} + +/* This function instead of just computing the SHA1 and xoring it + * against diget, also perform the digest of "digest" itself and + * replace the old value with the new one. + * + * So the final digest will be: + * + * digest = SHA1(digest xor SHA1(data)) + * + * This function is used every time we want to preserve the order so + * that digest(a,b,c,d) will be different than digest(b,c,d,a) + * + * Also note that mixdigest("foo") followed by mixdigest("bar") + * will lead to a different digest compared to "fo", "obar". + */ +static void mixDigest(unsigned char *digest, void *ptr, size_t len) { + SHA1_CTX ctx; + char *s = ptr; + + xorDigest(digest,s,len); + SHA1Init(&ctx); + SHA1Update(&ctx,digest,20); + SHA1Final(digest,&ctx); +} + +static void mixObjectDigest(unsigned char *digest, robj *o) { + o = getDecodedObject(o); + mixDigest(digest,o->ptr,sdslen(o->ptr)); + decrRefCount(o); +} + +/* Compute the dataset digest. Since keys, sets elements, hashes elements + * are not ordered, we use a trick: every aggregate digest is the xor + * of the digests of their elements. This way the order will not change + * the result. For list instead we use a feedback entering the output digest + * as input in order to ensure that a different ordered list will result in + * a different digest. */ +static void computeDatasetDigest(unsigned char *final) { + unsigned char digest[20]; + char buf[128]; + dictIterator *di = NULL; + dictEntry *de; + int j; + uint32_t aux; + + memset(final,0,20); /* Start with a clean result */ + + for (j = 0; j < server.dbnum; j++) { + redisDb *db = server.db+j; + + if (dictSize(db->dict) == 0) continue; + di = dictGetIterator(db->dict); + + /* hash the DB id, so the same dataset moved in a different + * DB will lead to a different digest */ + aux = htonl(j); + mixDigest(final,&aux,sizeof(aux)); + + /* Iterate this DB writing every entry */ + while((de = dictNext(di)) != NULL) { + sds key; + robj *keyobj, *o; + time_t expiretime; + + memset(digest,0,20); /* This key-val digest */ + key = dictGetEntryKey(de); + keyobj = createStringObject(key,sdslen(key)); + + mixDigest(digest,key,sdslen(key)); + + /* Make sure the key is loaded if VM is active */ + o = lookupKeyRead(db,keyobj); + + aux = htonl(o->type); + mixDigest(digest,&aux,sizeof(aux)); + expiretime = getExpire(db,keyobj); + + /* Save the key and associated value */ + if (o->type == REDIS_STRING) { + mixObjectDigest(digest,o); + } else if (o->type == REDIS_LIST) { + listTypeIterator *li = listTypeInitIterator(o,0,REDIS_TAIL); + listTypeEntry entry; + while(listTypeNext(li,&entry)) { + robj *eleobj = listTypeGet(&entry); + mixObjectDigest(digest,eleobj); + decrRefCount(eleobj); + } + listTypeReleaseIterator(li); + } else if (o->type == REDIS_SET) { + dict *set = o->ptr; + dictIterator *di = dictGetIterator(set); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + robj *eleobj = dictGetEntryKey(de); + + xorObjectDigest(digest,eleobj); + } + dictReleaseIterator(di); + } else if (o->type == REDIS_ZSET) { + zset *zs = o->ptr; + dictIterator *di = dictGetIterator(zs->dict); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + robj *eleobj = dictGetEntryKey(de); + double *score = dictGetEntryVal(de); + unsigned char eledigest[20]; + + snprintf(buf,sizeof(buf),"%.17g",*score); + memset(eledigest,0,20); + mixObjectDigest(eledigest,eleobj); + mixDigest(eledigest,buf,strlen(buf)); + xorDigest(digest,eledigest,20); + } + dictReleaseIterator(di); + } else if (o->type == REDIS_HASH) { + hashTypeIterator *hi; + robj *obj; + + hi = hashTypeInitIterator(o); + while (hashTypeNext(hi) != REDIS_ERR) { + unsigned char eledigest[20]; + + memset(eledigest,0,20); + obj = hashTypeCurrent(hi,REDIS_HASH_KEY); + mixObjectDigest(eledigest,obj); + decrRefCount(obj); + obj = hashTypeCurrent(hi,REDIS_HASH_VALUE); + mixObjectDigest(eledigest,obj); + decrRefCount(obj); + xorDigest(digest,eledigest,20); + } + hashTypeReleaseIterator(hi); + } else { + redisPanic("Unknown object type"); + } + /* If the key has an expire, add it to the mix */ + if (expiretime != -1) xorDigest(digest,"!!expire!!",10); + /* We can finally xor the key-val digest to the final digest */ + xorDigest(final,digest,20); + decrRefCount(keyobj); + } + dictReleaseIterator(di); + } +} + static void debugCommand(redisClient *c) { if (!strcasecmp(c->argv[1]->ptr,"segfault")) { *((char*)-1) = 'x'; @@ -9186,17 +11278,16 @@ static void debugCommand(redisClient *c) { redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF"); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) { - dictEntry *de = dictFind(c->db->dict,c->argv[2]); - robj *key, *val; + dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr); + robj *val; if (!de) { addReply(c,shared.nokeyerr); return; } - key = dictGetEntryKey(de); val = dictGetEntryVal(de); - if (!server.vm_enabled || (key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING)) { + if (!server.vm_enabled || (val->storage == REDIS_VM_MEMORY || + val->storage == REDIS_VM_SWAPPING)) { char *strenc; char buf[128]; @@ -9207,20 +11298,25 @@ static void debugCommand(redisClient *c) { strenc = buf; } addReplySds(c,sdscatprintf(sdsempty(), - "+Key at:%p refcount:%d, value at:%p refcount:%d " + "+Value at:%p refcount:%d " "encoding:%s serializedlength:%lld\r\n", - (void*)key, key->refcount, (void*)val, val->refcount, + (void*)val, val->refcount, strenc, (long long) rdbSavedObjectLen(val,NULL))); } else { + vmpointer *vp = (vmpointer*) val; addReplySds(c,sdscatprintf(sdsempty(), - "+Key at:%p refcount:%d, value swapped at: page %llu " + "+Value swapped at: page %llu " "using %llu pages\r\n", - (void*)key, key->refcount, (unsigned long long) key->vm.page, - (unsigned long long) key->vm.usedpages)); + (unsigned long long) vp->page, + (unsigned long long) vp->usedpages)); } + } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) { + lookupKeyRead(c->db,c->argv[2]); + addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) { - dictEntry *de = dictFind(c->db->dict,c->argv[2]); - robj *key, *val; + dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr); + robj *val; + vmpointer *vp; if (!server.vm_enabled) { addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n")); @@ -9230,32 +11326,67 @@ static void debugCommand(redisClient *c) { addReply(c,shared.nokeyerr); return; } - key = dictGetEntryKey(de); val = dictGetEntryVal(de); - /* If the key is shared we want to create a copy */ - if (key->refcount > 1) { - robj *newkey = dupStringObject(key); - decrRefCount(key); - key = dictGetEntryKey(de) = newkey; - } /* Swap it */ - if (key->storage != REDIS_VM_MEMORY) { + if (val->storage != REDIS_VM_MEMORY) { addReplySds(c,sdsnew("-ERR This key is not in memory\r\n")); - } else if (vmSwapObjectBlocking(key,val) == REDIS_OK) { - dictGetEntryVal(de) = NULL; + } else if (val->refcount != 1) { + addReplySds(c,sdsnew("-ERR Object is shared\r\n")); + } else if ((vp = vmSwapObjectBlocking(val)) != NULL) { + dictGetEntryVal(de) = vp; addReply(c,shared.ok); } else { addReply(c,shared.err); } + } else if (!strcasecmp(c->argv[1]->ptr,"populate") && c->argc == 3) { + long keys, j; + robj *key, *val; + char buf[128]; + + if (getLongFromObjectOrReply(c, c->argv[2], &keys, NULL) != REDIS_OK) + return; + for (j = 0; j < keys; j++) { + snprintf(buf,sizeof(buf),"key:%lu",j); + key = createStringObject(buf,strlen(buf)); + if (lookupKeyRead(c->db,key) != NULL) { + decrRefCount(key); + continue; + } + snprintf(buf,sizeof(buf),"value:%lu",j); + val = createStringObject(buf,strlen(buf)); + dbAdd(c->db,key,val); + decrRefCount(key); + } + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) { + unsigned char digest[20]; + sds d = sdsnew("+"); + int j; + + computeDatasetDigest(digest); + for (j = 0; j < 20; j++) + d = sdscatprintf(d, "%02x",digest[j]); + + d = sdscatlen(d,"\r\n",2); + addReplySds(c,d); } else { addReplySds(c,sdsnew( - "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT |SWAPOUT |RELOAD]\r\n")); + "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT |SWAPIN |SWAPOUT |RELOAD]\r\n")); } } static void _redisAssert(char *estr, char *file, int line) { redisLog(REDIS_WARNING,"=== ASSERTION FAILED ==="); - redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true\n",file,line,estr); + redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true",file,line,estr); +#ifdef HAVE_BACKTRACE + redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)"); + *((char*)-1) = 'x'; +#endif +} + +static void _redisPanic(char *msg, char *file, int line) { + redisLog(REDIS_WARNING,"!!! Software Failure. Press left mouse button to continue"); + redisLog(REDIS_WARNING,"Guru Meditation: %s #%s:%d",msg,file,line); #ifdef HAVE_BACKTRACE redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)"); *((char*)-1) = 'x'; @@ -9281,7 +11412,7 @@ int linuxOvercommitMemoryValue(void) { void linuxOvercommitMemoryWarning(void) { if (linuxOvercommitMemoryValue() == 0) { - redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."); + redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."); } } #endif /* __linux__ */ @@ -9311,7 +11442,8 @@ static void daemonize(void) { } static void version() { - printf("Redis server version %s\n", REDIS_VERSION); + printf("Redis server version %s (%s:%d)\n", REDIS_VERSION, + redisGitSHA1(), atoi(redisGitDirty()) > 0); exit(0); } @@ -9325,6 +11457,7 @@ int main(int argc, char **argv) { time_t start; initServerConfig(); + sortCommandTable(); if (argc == 2) { if (strcmp(argv[1], "-v") == 0 || strcmp(argv[1], "--version") == 0) version(); @@ -9378,7 +11511,7 @@ static void *getMcontextEip(ucontext_t *uc) { return (void*) uc->uc_mcontext->__ss.__rip; #else return (void*) uc->uc_mcontext->__ss.__eip; - #endif + #endif #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__) return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */ #elif defined(__ia64__) /* Linux IA64 */ @@ -9403,7 +11536,7 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) { redisLog(REDIS_WARNING, "%s",infostring); /* It's not safe to sdsfree() the returned string under memory * corruption conditions. Let it leak as we are going to abort */ - + trace_size = backtrace(trace, 100); /* overwrite sigaction with caller's address */ if (getMcontextEip(uc) != NULL) { @@ -9425,6 +11558,13 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) { _exit(0); } +static void sigtermHandler(int sig) { + REDIS_NOTUSED(sig); + + redisLog(REDIS_WARNING,"SIGTERM received, scheduling shutting down..."); + server.shutdown_asap = 1; +} + static void setupSigSegvAction(void) { struct sigaction act; @@ -9438,6 +11578,10 @@ static void setupSigSegvAction(void) { sigaction (SIGFPE, &act, NULL); sigaction (SIGILL, &act, NULL); sigaction (SIGBUS, &act, NULL); + + act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND; + act.sa_handler = sigtermHandler; + sigaction (SIGTERM, &act, NULL); return; }