X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/f85cd526c155a1b1302b2d080502d40227af00ec..ab52d1f4a8f3173622e0758d6f82f1aff0812b93:/src/redis.h diff --git a/src/redis.h b/src/redis.h index 27131e5d..af08145e 100644 --- a/src/redis.h +++ b/src/redis.h @@ -19,18 +19,19 @@ #include #include #include +#include -#include "ae.h" /* Event driven programming library */ -#include "sds.h" /* Dynamic safe strings */ -#include "dict.h" /* Hash tables */ -#include "adlist.h" /* Linked lists */ +#include "ae.h" /* Event driven programming library */ +#include "sds.h" /* Dynamic safe strings */ +#include "dict.h" /* Hash tables */ +#include "adlist.h" /* Linked lists */ #include "zmalloc.h" /* total memory usage aware version of malloc/free */ -#include "anet.h" /* Networking the easy way */ -#include "zipmap.h" /* Compact string -> string data structure */ +#include "anet.h" /* Networking the easy way */ +#include "zipmap.h" /* Compact string -> string data structure */ #include "ziplist.h" /* Compact list data structure */ -#include "intset.h" /* Compact integer set structure */ -#include "version.h" -#include "util.h" +#include "intset.h" /* Compact integer set structure */ +#include "version.h" /* Version macro */ +#include "util.h" /* Misc functions useful in many places */ /* Error codes */ #define REDIS_OK 0 @@ -52,19 +53,22 @@ #define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */ #define REDIS_AUTO_AOFREWRITE_PERC 100 #define REDIS_AUTO_AOFREWRITE_MIN_SIZE (1024*1024) +#define REDIS_SLOWLOG_LOG_SLOWER_THAN 10000 +#define REDIS_SLOWLOG_MAX_LEN 64 /* Hash table parameters */ #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */ -/* Command flags: - * REDIS_CMD_DENYOOM: - * Commands marked with this flag will return an error when 'maxmemory' is - * set and the server is using more than 'maxmemory' bytes of memory. - * In short: commands with this flag are denied on low memory conditions. - * REDIS_CMD_FORCE_REPLICATION: - * Force replication even if dirty is 0. */ -#define REDIS_CMD_DENYOOM 4 -#define REDIS_CMD_FORCE_REPLICATION 8 +/* Command flags. Please check the command table defined in the redis.c file + * for more information about the meaning of every flag. */ +#define REDIS_CMD_WRITE 1 /* "w" flag */ +#define REDIS_CMD_READONLY 2 /* "r" flag */ +#define REDIS_CMD_DENYOOM 4 /* "m" flag */ +#define REDIS_CMD_FORCE_REPLICATION 8 /* "f" flag */ +#define REDIS_CMD_ADMIN 16 /* "a" flag */ +#define REDIS_CMD_PUBSUB 32 /* "p" flag */ +#define REDIS_CMD_NOSCRIPT 64 /* "s" flag */ +#define REDIS_CMD_RANDOM 128 /* "R" flag */ /* Object types */ #define REDIS_STRING 0 @@ -74,12 +78,6 @@ #define REDIS_HASH 4 #define REDIS_VMPOINTER 8 -/* Object types only used for persistence in .rdb files */ -#define REDIS_HASH_ZIPMAP 9 -#define REDIS_LIST_ZIPLIST 10 -#define REDIS_SET_INTSET 11 -#define REDIS_ZSET_ZIPLIST 12 - /* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object * is set to one of this fields for this object. */ @@ -124,30 +122,17 @@ #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */ #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */ -/* Scheduled IO opeations flags. */ -#define REDIS_IO_LOAD 1 -#define REDIS_IO_SAVE 2 -#define REDIS_IO_LOADINPROG 4 -#define REDIS_IO_SAVEINPROG 8 - -/* Generic IO flags */ -#define REDIS_IO_ONLYLOADS 1 -#define REDIS_IO_ASAP 2 - -#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 -#define REDIS_THREAD_STACK_SIZE (1024*1024*4) - /* Client flags */ #define REDIS_SLAVE 1 /* This client is a slave server */ #define REDIS_MASTER 2 /* This client is a master server */ #define REDIS_MONITOR 4 /* This client is a slave monitor, see MONITOR */ #define REDIS_MULTI 8 /* This client is in a MULTI context */ #define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */ -#define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ #define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */ #define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */ #define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in server.unblocked_clients */ +#define REDIS_LUA_CLIENT 512 /* This is a non connected client used by Lua */ /* Client request types */ #define REDIS_REQ_INLINE 1 @@ -222,17 +207,13 @@ #define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4 #define REDIS_MAXMEMORY_NO_EVICTION 5 -/* Diskstore background saving thread states */ -#define REDIS_BGSAVE_THREAD_UNACTIVE 0 -#define REDIS_BGSAVE_THREAD_ACTIVE 1 -#define REDIS_BGSAVE_THREAD_DONE_OK 2 -#define REDIS_BGSAVE_THREAD_DONE_ERR 3 +/* Scripting */ +#define REDIS_LUA_TIME_LIMIT 60000 /* milliseconds */ /* We can print the stacktrace, so our assert is defined this way: */ +#define redisAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_redisAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1))) #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) #define redisPanic(_e) _redisPanic(#_e,__FILE__,__LINE__),_exit(1) -void _redisAssert(char *estr, char *file, int line); -void _redisPanic(char *msg, char *file, int line); /*----------------------------------------------------------------------------- * Data types @@ -292,9 +273,6 @@ typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ - dict *io_keys; /* Keys with clients waiting for DS I/O */ - dict *io_negcache; /* Negative caching for disk store */ - dict *io_queued; /* Queued IO operations hash table */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@ -330,6 +308,7 @@ typedef struct redisClient { sds querybuf; int argc; robj **argv; + struct redisCommand *cmd; int reqtype; int multibulklen; /* number of multi bulk arguments left to read */ long bulklen; /* length of bulk argument in multi bulk request */ @@ -365,7 +344,7 @@ struct sharedObjectsStruct { robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *cnegone, *pong, *space, *colon, *nullbulk, *nullmultibulk, *queued, *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, - *outofrangeerr, *loadingerr, *plus, + *outofrangeerr, *noscripterr, *loadingerr, *plus, *select0, *select1, *select2, *select3, *select4, *select5, *select6, *select7, *select8, *select9, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3, @@ -466,6 +445,7 @@ typedef struct { #define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */ #define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */ #define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */ +#define CLUSTERMSG_TYPE_PUBLISH 4 /* Pub/Sub Publish propatagion */ /* Initially we don't know our "name", but we'll find it once we connect * to the first node, using the getsockname() function. Then we'll use this @@ -484,16 +464,28 @@ typedef struct { char nodename[REDIS_CLUSTER_NAMELEN]; } clusterMsgDataFail; +typedef struct { + uint32_t channel_len; + uint32_t message_len; + unsigned char bulk_data[8]; /* defined as 8 just for alignment concerns. */ +} clusterMsgDataPublish; + union clusterMsgData { /* PING, MEET and PONG */ struct { /* Array of N clusterMsgDataGossip structures */ clusterMsgDataGossip gossip[1]; } ping; + /* FAIL */ struct { clusterMsgDataFail about; } fail; + + /* PUBLISH */ + struct { + clusterMsgDataPublish msg; + } publish; }; typedef struct { @@ -516,7 +508,6 @@ typedef struct { struct redisServer { /* General */ - pthread_t mainthread; redisDb *db; dict *commands; /* Command table hahs table */ aeEventLoop *el; @@ -524,6 +515,7 @@ struct redisServer { int port; char *bindaddr; char *unixsocket; + mode_t unixsocketperm; int ipfd; int sofd; int cfd; @@ -549,6 +541,10 @@ struct redisServer { long long stat_keyspace_misses; /* number of failed lookups of keys */ size_t stat_peak_memory; /* max used memory record */ long long stat_fork_time; /* time needed to perform latets fork() */ + list *slowlog; + long long slowlog_entry_id; + long long slowlog_log_slower_than; + unsigned long slowlog_max_len; /* Configuration */ int verbosity; int maxidletime; @@ -571,12 +567,10 @@ struct redisServer { time_t lastfsync; int appendfd; int appendseldb; + time_t aof_flush_postponed_start; char *pidfile; pid_t bgsavechildpid; pid_t bgrewritechildpid; - int bgsavethread_state; - pthread_mutex_t bgsavethread_mutex; - pthread_t bgsavethread; sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ sds aofbuf; /* AOF buffer, written before entering the event loop */ struct saveparam *saveparams; @@ -612,19 +606,12 @@ struct redisServer { int maxmemory_samples; /* Blocked clients */ unsigned int bpop_blocked_clients; - unsigned int cache_blocked_clients; list *unblocked_clients; /* list of clients to unblock before next loop */ - list *cache_io_queue; /* IO operations queue */ - int cache_flush_delay; /* seconds to wait before flushing keys */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; int sort_alpha; int sort_bypattern; - /* Virtual memory configuration */ - int ds_enabled; /* backend disk in redis.conf */ - char *ds_path; /* location of the disk store on disk */ - unsigned long long cache_max_memory; /* Zip structure config */ size_t hash_max_zipmap_entries; size_t hash_max_zipmap_value; @@ -634,30 +621,6 @@ struct redisServer { size_t zset_max_ziplist_entries; size_t zset_max_ziplist_value; time_t unixtime; /* Unix time sampled every second. */ - /* Virtual memory I/O threads stuff */ - /* An I/O thread process an element taken from the io_jobs queue and - * put the result of the operation in the io_done list. While the - * job is being processed, it's put on io_processing queue. */ - list *io_newjobs; /* List of VM I/O jobs yet to be processed */ - list *io_processing; /* List of VM I/O jobs being processed */ - list *io_processed; /* List of VM I/O jobs already processed */ - list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */ - pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */ - pthread_cond_t io_condvar; /* I/O threads conditional variable */ - pthread_attr_t io_threads_attr; /* attributes for threads creation */ - int io_active_threads; /* Number of running I/O threads */ - int vm_max_threads; /* Max number of I/O threads running at the same time */ - /* Our main thread is blocked on the event loop, locking for sockets ready - * to be read or written, so when a threaded I/O operation is ready to be - * processed by the main thread, the I/O thread will use a unix pipe to - * awake the main thread. The followings are the two pipe FDs. */ - int io_ready_pipe_read; - int io_ready_pipe_write; - /* Virtual memory stats */ - unsigned long long vm_stats_used_pages; - unsigned long long vm_stats_swapped_objects; - unsigned long long vm_stats_swapouts; - unsigned long long vm_stats_swapins; /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ @@ -667,6 +630,14 @@ struct redisServer { /* Cluster */ int cluster_enabled; clusterState cluster; + /* Scripting */ + lua_State *lua; /* The Lua interpreter. We use just one for all clients */ + redisClient *lua_client; /* The "fake client" to query Redis from Lua */ + dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ + long long lua_time_limit; + long long lua_time_start; + int lua_random_dirty; /* True if a random command was called during the + exection of the current script. */ }; typedef struct pubsubPattern { @@ -680,9 +651,10 @@ struct redisCommand { char *name; redisCommandProc *proc; int arity; - int flags; + char *sflags; /* Flags as string represenation, one char per flag. */ + int flags; /* The actual flags, obtained from the 'sflags' field. */ /* Use a function to determine keys arguments in a command line. - * Used both for diskstore preloading and Redis Cluster. */ + * Used for Redis Cluster redirect. */ redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ int firstkey; /* The first argument that's a key (0 = no keys) */ @@ -709,27 +681,6 @@ typedef struct _redisSortOperation { robj *pattern; } redisSortOperation; -/* DIsk store threaded I/O request message */ -#define REDIS_IOJOB_LOAD 0 -#define REDIS_IOJOB_SAVE 1 - -typedef struct iojob { - int type; /* Request type, REDIS_IOJOB_* */ - redisDb *db;/* Redis database */ - robj *key; /* This I/O request is about this key */ - robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this - * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */ - time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */ -} iojob; - -/* IO operations scheduled -- check dscache.c for more info */ -typedef struct ioop { - int type; - redisDb *db; - robj *key; - time_t ctime; /* This is the creation time of the entry. */ -} ioop; - /* Structure to hold list iteration abstraction. */ typedef struct { robj *subject; @@ -780,6 +731,7 @@ extern struct sharedObjectsStruct shared; extern dictType setDictType; extern dictType zsetDictType; extern dictType clusterNodesDictType; +extern dictType dbDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; dictType hashDictType; @@ -819,6 +771,8 @@ void addReplyMultiBulkLen(redisClient *c, long length); void *dupClientReplyValue(void *o); void getClientsMaxBuffers(unsigned long *longest_output_list, unsigned long *biggest_input_buffer); +void rewriteClientCommandVector(redisClient *c, int argc, ...); +void rewriteClientCommandArgument(redisClient *c, int i, robj *newval); #ifdef __GNUC__ void addReplyErrorFormat(redisClient *c, const char *fmt, ...) @@ -851,13 +805,14 @@ void popGenericCommand(redisClient *c, int where); void unwatchAllKeys(redisClient *c); void initClientMultiState(redisClient *c); void freeClientMultiState(redisClient *c); -void queueMultiCommand(redisClient *c, struct redisCommand *cmd); +void queueMultiCommand(redisClient *c); void touchWatchedKey(redisDb *db, robj *key); void touchWatchedKeysOnFlush(int dbid); /* Redis object implementation */ void decrRefCount(void *o); void incrRefCount(robj *o); +robj *resetRefCount(robj *obj); void freeStringObject(robj *o); void freeListObject(robj *o); void freeSetObject(robj *o); @@ -892,11 +847,6 @@ unsigned long estimateObjectIdleTime(robj *o); int syncWrite(int fd, char *ptr, ssize_t size, int timeout); int syncRead(int fd, char *ptr, ssize_t size, int timeout); int syncReadLine(int fd, char *ptr, ssize_t size, int timeout); -int fwriteBulkString(FILE *fp, char *s, unsigned long len); -int fwriteBulkDouble(FILE *fp, double d); -int fwriteBulkLongLong(FILE *fp, long long l); -int fwriteBulkObject(FILE *fp, robj *obj); -int fwriteBulkCount(FILE *fp, char prefix, int count); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); @@ -910,24 +860,10 @@ void loadingProgress(off_t pos); void stopLoading(void); /* RDB persistence */ -int rdbLoad(char *filename); -int rdbSaveBackground(char *filename); -void rdbRemoveTempFile(pid_t childpid); -int rdbSave(char *filename); -int rdbSaveObject(FILE *fp, robj *o); -off_t rdbSavedObjectLen(robj *o); -off_t rdbSavedObjectPages(robj *o); -robj *rdbLoadObject(int type, FILE *fp); -void backgroundSaveDoneHandler(int exitcode, int bysignal); -int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val, time_t expireitme, time_t now); -int rdbLoadType(FILE *fp); -time_t rdbLoadTime(FILE *fp); -robj *rdbLoadStringObject(FILE *fp); -int rdbSaveType(FILE *fp, unsigned char type); -int rdbSaveLen(FILE *fp, uint32_t len); +#include "rdb.h" /* AOF persistence */ -void flushAppendOnlyFile(void); +void flushAppendOnlyFile(int force); void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc); void aofRemoveTempFile(pid_t childpid); int rewriteAppendOnlyFileBackground(void); @@ -962,9 +898,10 @@ int processCommand(redisClient *c); void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommandByCString(char *s); -void call(redisClient *c, struct redisCommand *cmd); +void call(redisClient *c); int prepareForShutdown(); void redisLog(int level, const char *fmt, ...); +void redisLogRaw(int level, const char *msg); void usage(); void updateDictResizePolicy(void); int htNeedsResize(dict *dict); @@ -972,40 +909,6 @@ void oom(const char *msg); void populateCommandTable(void); void resetCommandTableStats(void); -/* Disk store */ -int dsOpen(void); -int dsClose(void); -int dsSet(redisDb *db, robj *key, robj *val, time_t expire); -robj *dsGet(redisDb *db, robj *key, time_t *expire); -int dsDel(redisDb *db, robj *key); -int dsExists(redisDb *db, robj *key); -void dsFlushDb(int dbid); -int dsRdbSaveBackground(char *filename); -int dsRdbSave(char *filename); - -/* Disk Store Cache */ -void dsInit(void); -void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask); -void lockThreadedIO(void); -void unlockThreadedIO(void); -void freeIOJob(iojob *j); -void queueIOJob(iojob *j); -void waitEmptyIOJobsQueue(void); -void processAllPendingIOJobs(void); -int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); -int dontWaitForSwappedKey(redisClient *c, robj *key); -void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); -int cacheFreeOneEntry(void); -void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag); -void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag); -int cacheScheduleIOGetFlags(redisDb *db, robj *key); -void cacheScheduleIO(redisDb *db, robj *key, int type); -void cacheCron(void); -int cacheKeyMayExist(redisDb *db, robj *key); -void cacheSetKeyMayExist(redisDb *db, robj *key); -void cacheSetKeyDoesNotExist(redisDb *db, robj *key); -void cacheForcePointInTime(void); - /* Set data type */ robj *setTypeCreate(robj *value); int setTypeAdd(robj *subject, robj *value); @@ -1041,6 +944,7 @@ int pubsubUnsubscribeAllChannels(redisClient *c, int notify); int pubsubUnsubscribeAllPatterns(redisClient *c, int notify); void freePubsubPattern(void *p); int listMatchPubsubPattern(void *a, void *b); +int pubsubPublishMessage(robj *channel, robj *message); /* Configuration */ void loadServerConfig(char *filename); @@ -1087,6 +991,10 @@ clusterNode *createClusterNode(char *nodename, int flags); int clusterAddNode(clusterNode *node); void clusterCron(void); clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); +void clusterPropagatePublish(robj *channel, robj *message); + +/* Scripting */ +void scriptingInit(void); /* Git SHA1 */ char *redisGitSHA1(void); @@ -1216,6 +1124,8 @@ void migrateCommand(redisClient *c); void dumpCommand(redisClient *c); void objectCommand(redisClient *c); void clientCommand(redisClient *c); +void evalCommand(redisClient *c); +void evalShaCommand(redisClient *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); @@ -1224,4 +1134,9 @@ void *malloc(size_t size) __attribute__ ((deprecated)); void *realloc(void *ptr, size_t size) __attribute__ ((deprecated)); #endif +/* Debugging stuff */ +void _redisAssertWithInfo(redisClient *c, robj *o, char *estr, char *file, int line); +void _redisAssert(char *estr, char *file, int line); +void _redisPanic(char *msg, char *file, int line); + #endif