#include <pthread.h>
#include <syslog.h>
#include <netinet/in.h>
+#include <lua.h>
-#include "ae.h" /* Event driven programming library */
-#include "sds.h" /* Dynamic safe strings */
-#include "dict.h" /* Hash tables */
-#include "adlist.h" /* Linked lists */
+#include "ae.h" /* Event driven programming library */
+#include "sds.h" /* Dynamic safe strings */
+#include "dict.h" /* Hash tables */
+#include "adlist.h" /* Linked lists */
#include "zmalloc.h" /* total memory usage aware version of malloc/free */
-#include "anet.h" /* Networking the easy way */
-#include "zipmap.h" /* Compact string -> string data structure */
+#include "anet.h" /* Networking the easy way */
+#include "zipmap.h" /* Compact string -> string data structure */
#include "ziplist.h" /* Compact list data structure */
-#include "intset.h" /* Compact integer set structure */
-#include "version.h"
+#include "intset.h" /* Compact integer set structure */
+#include "version.h" /* Version macro */
+#include "util.h" /* Misc functions useful in many places */
/* Error codes */
#define REDIS_OK 0
#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
#define REDIS_IOBUF_LEN 1024
#define REDIS_LOADBUF_LEN 1024
-#define REDIS_STATIC_ARGS 8
#define REDIS_DEFAULT_DBNUM 16
#define REDIS_CONFIGLINE_MAX 1024
#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
#define REDIS_SHARED_INTEGERS 10000
#define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
#define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */
+#define REDIS_AUTO_AOFREWRITE_PERC 100
+#define REDIS_AUTO_AOFREWRITE_MIN_SIZE (1024*1024)
+#define REDIS_SLOWLOG_LOG_SLOWER_THAN 10000
+#define REDIS_SLOWLOG_MAX_LEN 64
/* Hash table parameters */
#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
#define REDIS_ZSET 3
#define REDIS_HASH 4
#define REDIS_VMPOINTER 8
+
/* Object types only used for persistence in .rdb files */
#define REDIS_HASH_ZIPMAP 9
#define REDIS_LIST_ZIPLIST 10
#define REDIS_SET_INTSET 11
+#define REDIS_ZSET_ZIPLIST 12
/* Objects encoding. Some kind of objects like Strings and Hashes can be
* internally represented in multiple ways. The 'encoding' field of the object
#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
-/* Scheduled IO opeations flags. */
-#define REDIS_IO_LOAD 1
-#define REDIS_IO_SAVE 2
-#define REDIS_IO_LOADINPROG 4
-#define REDIS_IO_SAVEINPROG 8
-
-/* Generic IO flags */
-#define REDIS_IO_ONLYLOADS 1
-#define REDIS_IO_ASAP 2
-
-#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
-#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
-
/* Client flags */
#define REDIS_SLAVE 1 /* This client is a slave server */
#define REDIS_MASTER 2 /* This client is a master server */
#define REDIS_MONITOR 4 /* This client is a slave monitor, see MONITOR */
#define REDIS_MULTI 8 /* This client is in a MULTI context */
#define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */
-#define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */
#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */
#define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */
#define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in
server.unblocked_clients */
+#define REDIS_LUA_CLIENT 512 /* This is a non connected client used by Lua */
/* Client request types */
#define REDIS_REQ_INLINE 1
#define REDIS_REQ_MULTIBULK 2
/* Slave replication state - slave side */
-#define REDIS_REPL_NONE 0 /* No active replication */
-#define REDIS_REPL_CONNECT 1 /* Must connect to master */
-#define REDIS_REPL_TRANSFER 2 /* Receiving .rdb from master */
-#define REDIS_REPL_CONNECTED 3 /* Connected to master */
+#define REDIS_REPL_NONE 0 /* No active replication */
+#define REDIS_REPL_CONNECT 1 /* Must connect to master */
+#define REDIS_REPL_CONNECTING 2 /* Connecting to master */
+#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */
+#define REDIS_REPL_CONNECTED 4 /* Connected to master */
+
+/* Synchronous read timeout - slave side */
+#define REDIS_REPL_SYNCIO_TIMEOUT 5
/* Slave replication state - from the point of view of master
* Note that in SEND_BULK and ONLINE state the slave receives new updates
#define REDIS_LIST_MAX_ZIPLIST_ENTRIES 512
#define REDIS_LIST_MAX_ZIPLIST_VALUE 64
#define REDIS_SET_MAX_INTSET_ENTRIES 512
+#define REDIS_ZSET_MAX_ZIPLIST_ENTRIES 128
+#define REDIS_ZSET_MAX_ZIPLIST_VALUE 64
/* Sets operations codes */
#define REDIS_OP_UNION 0
#define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4
#define REDIS_MAXMEMORY_NO_EVICTION 5
-/* Diskstore background saving thread states */
-#define REDIS_BGSAVE_THREAD_UNACTIVE 0
-#define REDIS_BGSAVE_THREAD_ACTIVE 1
-#define REDIS_BGSAVE_THREAD_DONE_OK 2
-#define REDIS_BGSAVE_THREAD_DONE_ERR 3
+/* Scripting */
+#define REDIS_LUA_TIME_LIMIT 60000 /* milliseconds */
/* We can print the stacktrace, so our assert is defined this way: */
#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
- dict *io_keys; /* Keys with clients waiting for DS I/O */
- dict *io_negcache; /* Negative caching for disk store */
- dict *io_queued; /* Queued IO operations hash table */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
sds querybuf;
int argc;
robj **argv;
+ struct redisCommand *cmd;
int reqtype;
int multibulklen; /* number of multi bulk arguments left to read */
long bulklen; /* length of bulk argument in multi bulk request */
robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *cnegone, *pong, *space,
*colon, *nullbulk, *nullmultibulk, *queued,
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
- *outofrangeerr, *loadingerr, *plus,
+ *outofrangeerr, *noscripterr, *loadingerr, *plus,
*select0, *select1, *select2, *select3, *select4,
*select5, *select6, *select7, *select8, *select9,
*messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3,
*integers[REDIS_SHARED_INTEGERS];
};
+/* ZSETs use a specialized version of Skiplists */
+typedef struct zskiplistNode {
+ robj *obj;
+ double score;
+ struct zskiplistNode *backward;
+ struct zskiplistLevel {
+ struct zskiplistNode *forward;
+ unsigned int span;
+ } level[];
+} zskiplistNode;
+
+typedef struct zskiplist {
+ struct zskiplistNode *header, *tail;
+ unsigned long length;
+ int level;
+} zskiplist;
+
+typedef struct zset {
+ dict *dict;
+ zskiplist *zsl;
+} zset;
+
/*-----------------------------------------------------------------------------
* Redis cluster data structures
*----------------------------------------------------------------------------*/
clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];
clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];
clusterNode *slots[REDIS_CLUSTER_SLOTS];
+ zskiplist *slots_to_keys;
} clusterState;
/* Redis cluster messages header */
struct redisServer {
/* General */
- pthread_t mainthread;
redisDb *db;
dict *commands; /* Command table hahs table */
aeEventLoop *el;
long long stat_evictedkeys; /* number of evicted keys (maxmemory) */
long long stat_keyspace_hits; /* number of successful lookups of keys */
long long stat_keyspace_misses; /* number of failed lookups of keys */
+ size_t stat_peak_memory; /* max used memory record */
+ long long stat_fork_time; /* time needed to perform latets fork() */
+ list *slowlog;
+ long long slowlog_entry_id;
+ long long slowlog_log_slower_than;
+ unsigned long slowlog_max_len;
/* Configuration */
int verbosity;
int maxidletime;
int appendonly;
int appendfsync;
int no_appendfsync_on_rewrite;
+ int auto_aofrewrite_perc; /* Rewrite AOF if % growth is > M and... */
+ off_t auto_aofrewrite_min_size; /* the AOF file is at least N bytes. */
+ off_t auto_aofrewrite_base_size;/* AOF size on latest startup or rewrite. */
+ off_t appendonly_current_size; /* AOF current size. */
+ int aofrewrite_scheduled; /* Rewrite once BGSAVE terminates. */
int shutdown_asap;
int activerehashing;
char *requirepass;
char *pidfile;
pid_t bgsavechildpid;
pid_t bgrewritechildpid;
- int bgsavethread_state;
- pthread_mutex_t bgsavethread_mutex;
- pthread_t bgsavethread;
sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
sds aofbuf; /* AOF buffer, written before entering the event loop */
struct saveparam *saveparams;
char *masterhost;
int masterport;
redisClient *master; /* client that is master for this slave */
+ int repl_syncio_timeout; /* timeout for synchronous I/O calls */
int replstate; /* replication status if the instance is a slave */
off_t repl_transfer_left; /* bytes left reading .rdb */
int repl_transfer_s; /* slave -> master SYNC socket */
char *repl_transfer_tmpfile; /* slave-> master SYNC temp file name */
time_t repl_transfer_lastio; /* unix time of the latest read, for timeout */
int repl_serve_stale_data; /* Serve stale data when link is down? */
+ time_t repl_down_since; /* unix time at which link with master went down */
/* Limits */
unsigned int maxclients;
unsigned long long maxmemory;
int maxmemory_samples;
/* Blocked clients */
unsigned int bpop_blocked_clients;
- unsigned int cache_blocked_clients;
list *unblocked_clients; /* list of clients to unblock before next loop */
- list *cache_io_queue; /* IO operations queue */
- int cache_flush_delay; /* seconds to wait before flushing keys */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
int sort_alpha;
int sort_bypattern;
- /* Virtual memory configuration */
- int ds_enabled; /* backend disk in redis.conf */
- char *ds_path; /* location of the disk store on disk */
- unsigned long long cache_max_memory;
/* Zip structure config */
size_t hash_max_zipmap_entries;
size_t hash_max_zipmap_value;
size_t list_max_ziplist_entries;
size_t list_max_ziplist_value;
size_t set_max_intset_entries;
+ size_t zset_max_ziplist_entries;
+ size_t zset_max_ziplist_value;
time_t unixtime; /* Unix time sampled every second. */
- /* Virtual memory I/O threads stuff */
- /* An I/O thread process an element taken from the io_jobs queue and
- * put the result of the operation in the io_done list. While the
- * job is being processed, it's put on io_processing queue. */
- list *io_newjobs; /* List of VM I/O jobs yet to be processed */
- list *io_processing; /* List of VM I/O jobs being processed */
- list *io_processed; /* List of VM I/O jobs already processed */
- list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
- pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
- pthread_cond_t io_condvar; /* I/O threads conditional variable */
- pthread_attr_t io_threads_attr; /* attributes for threads creation */
- int io_active_threads; /* Number of running I/O threads */
- int vm_max_threads; /* Max number of I/O threads running at the same time */
- /* Our main thread is blocked on the event loop, locking for sockets ready
- * to be read or written, so when a threaded I/O operation is ready to be
- * processed by the main thread, the I/O thread will use a unix pipe to
- * awake the main thread. The followings are the two pipe FDs. */
- int io_ready_pipe_read;
- int io_ready_pipe_write;
- /* Virtual memory stats */
- unsigned long long vm_stats_used_pages;
- unsigned long long vm_stats_swapped_objects;
- unsigned long long vm_stats_swapouts;
- unsigned long long vm_stats_swapins;
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */
/* Misc */
unsigned lruclock:22; /* clock incrementing every minute, for LRU */
unsigned lruclock_padding:10;
+ /* Cluster */
int cluster_enabled;
clusterState cluster;
+ /* Scripting */
+ lua_State *lua; /* The Lua interpreter. We use just one for all clients */
+ redisClient *lua_client; /* The "fake client" to query Redis from Lua */
+ dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
+ long long lua_time_limit;
+ long long lua_time_start;
};
typedef struct pubsubPattern {
int arity;
int flags;
/* Use a function to determine keys arguments in a command line.
- * Used both for diskstore preloading and Redis Cluster. */
+ * Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
robj *pattern;
} redisSortOperation;
-/* ZSETs use a specialized version of Skiplists */
-typedef struct zskiplistNode {
- robj *obj;
- double score;
- struct zskiplistNode *backward;
- struct zskiplistLevel {
- struct zskiplistNode *forward;
- unsigned int span;
- } level[];
-} zskiplistNode;
-
-typedef struct zskiplist {
- struct zskiplistNode *header, *tail;
- unsigned long length;
- int level;
-} zskiplist;
-
-typedef struct zset {
- dict *dict;
- zskiplist *zsl;
-} zset;
-
-/* DIsk store threaded I/O request message */
-#define REDIS_IOJOB_LOAD 0
-#define REDIS_IOJOB_SAVE 1
-
-typedef struct iojob {
- int type; /* Request type, REDIS_IOJOB_* */
- redisDb *db;/* Redis database */
- robj *key; /* This I/O request is about this key */
- robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this
- * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */
- time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */
-} iojob;
-
-/* IO operations scheduled -- check dscache.c for more info */
-typedef struct ioop {
- int type;
- redisDb *db;
- robj *key;
- time_t ctime; /* This is the creation time of the entry. */
-} ioop;
-
/* Structure to hold list iteration abstraction. */
typedef struct {
robj *subject;
extern dictType setDictType;
extern dictType zsetDictType;
extern dictType clusterNodesDictType;
+extern dictType dbDictType;
extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
dictType hashDictType;
void *dupClientReplyValue(void *o);
void getClientsMaxBuffers(unsigned long *longest_output_list,
unsigned long *biggest_input_buffer);
+void rewriteClientCommandVector(redisClient *c, int argc, ...);
+void rewriteClientCommandArgument(redisClient *c, int i, robj *newval);
#ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
void unwatchAllKeys(redisClient *c);
void initClientMultiState(redisClient *c);
void freeClientMultiState(redisClient *c);
-void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
+void queueMultiCommand(redisClient *c);
void touchWatchedKey(redisDb *db, robj *key);
void touchWatchedKeysOnFlush(int dbid);
/* Redis object implementation */
void decrRefCount(void *o);
void incrRefCount(robj *o);
+robj *resetRefCount(robj *obj);
void freeStringObject(robj *o);
void freeListObject(robj *o);
void freeSetObject(robj *o);
robj *createObject(int type, void *ptr);
robj *createStringObject(char *ptr, size_t len);
robj *dupStringObject(robj *o);
+int isObjectRepresentableAsLongLong(robj *o, long long *llongval);
robj *tryObjectEncoding(robj *o);
robj *getDecodedObject(robj *o);
size_t stringObjectLen(robj *o);
robj *createIntsetObject(void);
robj *createHashObject(void);
robj *createZsetObject(void);
+robj *createZsetZiplistObject(void);
int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg);
int checkType(redisClient *c, robj *o, int type);
int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, const char *msg);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc);
-int syncWithMaster(void);
void updateSlavesWaitingBgsave(int bgsaveerr);
void replicationCron(void);
void backgroundRewriteDoneHandler(int exitcode, int bysignal);
/* Sorted sets data type */
+
+/* Struct to hold a inclusive/exclusive range spec. */
+typedef struct {
+ double min, max;
+ int minex, maxex; /* are min or max exclusive? */
+} zrangespec;
+
zskiplist *zslCreate(void);
void zslFree(zskiplist *zsl);
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
+unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score);
+int zslDelete(zskiplist *zsl, double score, robj *obj);
+zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range);
+double zzlGetScore(unsigned char *sptr);
+void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
+void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
+unsigned int zsetLength(robj *zobj);
+void zsetConvert(robj *zobj, int encoding);
/* Core functions */
void freeMemoryIfNeeded(void);
void setupSignalHandlers(void);
struct redisCommand *lookupCommand(sds name);
struct redisCommand *lookupCommandByCString(char *s);
-void call(redisClient *c, struct redisCommand *cmd);
+void call(redisClient *c);
int prepareForShutdown();
void redisLog(int level, const char *fmt, ...);
+void redisLogRaw(int level, const char *msg);
void usage();
void updateDictResizePolicy(void);
int htNeedsResize(dict *dict);
void populateCommandTable(void);
void resetCommandTableStats(void);
-/* Disk store */
-int dsOpen(void);
-int dsClose(void);
-int dsSet(redisDb *db, robj *key, robj *val, time_t expire);
-robj *dsGet(redisDb *db, robj *key, time_t *expire);
-int dsDel(redisDb *db, robj *key);
-int dsExists(redisDb *db, robj *key);
-void dsFlushDb(int dbid);
-int dsRdbSaveBackground(char *filename);
-int dsRdbSave(char *filename);
-
-/* Disk Store Cache */
-void dsInit(void);
-void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask);
-void lockThreadedIO(void);
-void unlockThreadedIO(void);
-void freeIOJob(iojob *j);
-void queueIOJob(iojob *j);
-void waitEmptyIOJobsQueue(void);
-void processAllPendingIOJobs(void);
-int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
-int dontWaitForSwappedKey(redisClient *c, robj *key);
-void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
-int cacheFreeOneEntry(void);
-void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag);
-void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag);
-int cacheScheduleIOGetFlags(redisDb *db, robj *key);
-void cacheScheduleIO(redisDb *db, robj *key, int type);
-void cacheCron(void);
-int cacheKeyMayExist(redisDb *db, robj *key);
-void cacheSetKeyMayExist(redisDb *db, robj *key);
-void cacheSetKeyDoesNotExist(redisDb *db, robj *key);
-void cacheForcePointInTime(void);
-
/* Set data type */
robj *setTypeCreate(robj *value);
int setTypeAdd(robj *subject, robj *value);
void freePubsubPattern(void *p);
int listMatchPubsubPattern(void *a, void *b);
-/* Utility functions */
-int stringmatchlen(const char *pattern, int patternLen,
- const char *string, int stringLen, int nocase);
-int stringmatch(const char *pattern, const char *string, int nocase);
-long long memtoll(const char *p, int *err);
-int ll2string(char *s, size_t len, long long value);
-int isStringRepresentableAsLong(sds s, long *longval);
-int isStringRepresentableAsLongLong(sds s, long long *longval);
-int isObjectRepresentableAsLongLong(robj *o, long long *llongval);
-
/* Configuration */
void loadServerConfig(char *filename);
void appendServerSaveParams(time_t seconds, int changes);
robj *lookupKeyWrite(redisDb *db, robj *key);
robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply);
robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply);
-int dbAdd(redisDb *db, robj *key, robj *val);
-int dbReplace(redisDb *db, robj *key, robj *val);
+void dbAdd(redisDb *db, robj *key, robj *val);
+void dbOverwrite(redisDb *db, robj *key, robj *val);
+void setKey(redisDb *db, robj *key, robj *val);
int dbExists(redisDb *db, robj *key);
robj *dbRandomKey(redisDb *db);
int dbDelete(redisDb *db, robj *key);
int selectDb(redisClient *c, int id);
void signalModifiedKey(redisDb *db, robj *key);
void signalFlushedDb(int dbid);
+unsigned int GetKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count);
/* API to get key arguments from commands */
#define REDIS_GETKEYS_ALL 0
clusterNode *createClusterNode(char *nodename, int flags);
int clusterAddNode(clusterNode *node);
void clusterCron(void);
-clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot);
+clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
+
+/* Scripting */
+void scriptingInit(void);
/* Git SHA1 */
char *redisGitSHA1(void);
void migrateCommand(redisClient *c);
void dumpCommand(redisClient *c);
void objectCommand(redisClient *c);
+void clientCommand(redisClient *c);
+void evalCommand(redisClient *c);
+void evalShaCommand(redisClient *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));