{"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0},
{"setex",setexCommand,4,REDIS_CMD_DENYOOM,NULL,0,0,0},
{"append",appendCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1},
- {"substr",substrCommand,4,0,NULL,1,1,1},
{"strlen",strlenCommand,2,0,NULL,1,1,1},
{"del",delCommand,-2,0,NULL,0,0,0},
{"exists",existsCommand,2,0,NULL,1,1,1},
+ {"setbit",setbitCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1},
+ {"getbit",getbitCommand,3,0,NULL,1,1,1},
+ {"setrange",setrangeCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1},
+ {"getrange",getrangeCommand,4,0,NULL,1,1,1},
+ {"substr",getrangeCommand,4,0,NULL,1,1,1},
{"incr",incrCommand,2,REDIS_CMD_DENYOOM,NULL,1,1,1},
{"decr",decrCommand,2,REDIS_CMD_DENYOOM,NULL,1,1,1},
{"mget",mgetCommand,-2,0,NULL,1,-1,1},
{"rpop",rpopCommand,2,0,NULL,1,1,1},
{"lpop",lpopCommand,2,0,NULL,1,1,1},
{"brpop",brpopCommand,-3,0,NULL,1,1,1},
+ {"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1},
{"blpop",blpopCommand,-3,0,NULL,1,1,1},
{"llen",llenCommand,2,0,NULL,1,1,1},
{"lindex",lindexCommand,3,0,NULL,1,1,1},
{"lrange",lrangeCommand,4,0,NULL,1,1,1},
{"ltrim",ltrimCommand,4,0,NULL,1,1,1},
{"lrem",lremCommand,4,0,NULL,1,1,1},
- {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1},
+ {"rpoplpush",rpoplpushCommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1},
{"sadd",saddCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1},
{"srem",sremCommand,3,0,NULL,1,1,1},
{"smove",smoveCommand,4,0,NULL,1,2,1},
{"zcount",zcountCommand,4,0,NULL,1,1,1},
{"zrevrange",zrevrangeCommand,-4,0,NULL,1,1,1},
{"zcard",zcardCommand,2,0,NULL,1,1,1},
- {"zscore",zscoreCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1},
+ {"zscore",zscoreCommand,3,0,NULL,1,1,1},
{"zrank",zrankCommand,3,0,NULL,1,1,1},
{"zrevrank",zrevrankCommand,3,0,NULL,1,1,1},
{"hset",hsetCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1},
/*============================ Utility functions ============================ */
void redisLog(int level, const char *fmt, ...) {
+ const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING };
+ const char *c = ".-*#";
+ time_t now = time(NULL);
va_list ap;
FILE *fp;
- char *c = ".-*#";
char buf[64];
- time_t now;
+ char msg[REDIS_MAX_LOGMSG_LEN];
if (level < server.verbosity) return;
if (!fp) return;
va_start(ap, fmt);
- now = time(NULL);
- strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
- fprintf(fp,"[%d] %s %c ",(int)getpid(),buf,c[level]);
- vfprintf(fp, fmt, ap);
- fprintf(fp,"\n");
- fflush(fp);
+ vsnprintf(msg, sizeof(msg), fmt, ap);
va_end(ap);
+ strftime(buf,sizeof(buf),"%d %b %H:%M:%S",localtime(&now));
+ fprintf(fp,"[%d] %s %c %s\n",(int)getpid(),buf,c[level],msg);
+ fflush(fp);
+
if (server.logfile) fclose(fp);
+
+ if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg);
}
/* Redis generally does not try to recover from out of memory conditions
}
}
-/* Sets type */
+/* Sets type and diskstore negative caching hash table */
dictType setDictType = {
dictEncObjHash, /* hash function */
NULL, /* key dup */
}
/* Close connections of timedout clients */
- if ((server.maxidletime && !(loops % 100)) || server.blpop_blocked_clients)
+ if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
closeTimedoutClients();
/* Check if a background saving or AOF rewrite in progress terminated */
}
updateDictResizePolicy();
}
- } else {
+ } else if (!server.ds_enabled) {
/* If there is not a background saving in progress check if
* we have to save now */
time_t now = time(NULL);
* in order to guarantee a strict consistency. */
if (server.masterhost == NULL) activeExpireCycle();
- /* Swap a few keys on disk if we are over the memory limit and VM
- * is enbled. Try to free objects from the free list first. */
- if (vmCanSwapOut()) {
- while (server.vm_enabled && zmalloc_used_memory() >
- server.vm_max_memory)
- {
- int retval;
-
- if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
- retval = (server.vm_max_threads == 0) ?
- vmSwapOneObjectBlocking() :
- vmSwapOneObjectThreaded();
- if (retval == REDIS_ERR && !(loops % 300) &&
- zmalloc_used_memory() >
- (server.vm_max_memory+server.vm_max_memory/10))
- {
- redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
- }
- /* Note that when using threade I/O we free just one object,
- * because anyway when the I/O thread in charge to swap this
- * object out will finish, the handler of completed jobs
- * will try to swap more objects if we are still out of memory. */
- if (retval == REDIS_ERR || server.vm_max_threads > 0) break;
- }
- }
+ /* Remove a few cached objects from memory if we are over the
+ * configured memory limit */
+ if (server.ds_enabled) cacheCron();
+
+ /* Replication cron function -- used to reconnect to master and
+ * to detect transfer failures. */
+ if (!(loops % 10)) replicationCron();
- /* Check if we should connect to a MASTER */
- if (server.replstate == REDIS_REPL_CONNECT && !(loops % 10)) {
- redisLog(REDIS_NOTICE,"Connecting to MASTER...");
- if (syncWithMaster() == REDIS_OK) {
- redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
- if (server.appendonly) rewriteAppendOnlyFileBackground();
- }
- }
return 100;
}
* for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
REDIS_NOTUSED(eventLoop);
+ listNode *ln;
+ redisClient *c;
- /* Awake clients that got all the swapped keys they requested */
- if (server.vm_enabled && listLength(server.io_ready_clients)) {
+ /* Awake clients that got all the on disk keys they requested */
+ if (server.ds_enabled && listLength(server.io_ready_clients)) {
listIter li;
- listNode *ln;
listRewind(server.io_ready_clients,&li);
while((ln = listNext(&li))) {
- redisClient *c = ln->value;
+ c = ln->value;
struct redisCommand *cmd;
/* Resume the client. */
listDelNode(server.io_ready_clients,ln);
c->flags &= (~REDIS_IO_WAIT);
- server.vm_blocked_clients--;
+ server.cache_blocked_clients--;
aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c);
cmd = lookupCommand(c->argv[0]->ptr);
processInputBuffer(c);
}
}
+
+ /* Try to process pending commands for clients that were just unblocked. */
+ while (listLength(server.unblocked_clients)) {
+ ln = listFirst(server.unblocked_clients);
+ redisAssert(ln != NULL);
+ c = ln->value;
+ listDelNode(server.unblocked_clients,ln);
+
+ /* Process remaining data in the input buffer. */
+ if (c->querybuf && sdslen(c->querybuf) > 0)
+ processInputBuffer(c);
+ }
+
/* Write the AOF buffer on disk */
flushAppendOnlyFile();
}
"-ERR source and destination objects are the same\r\n"));
shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
"-ERR index out of range\r\n"));
+ shared.loadingerr = createObject(REDIS_STRING,sdsnew(
+ "-LOADING Redis is loading the dataset in memory\r\n"));
shared.space = createObject(REDIS_STRING,sdsnew(" "));
shared.colon = createObject(REDIS_STRING,sdsnew(":"));
shared.plus = createObject(REDIS_STRING,sdsnew("+"));
server.verbosity = REDIS_VERBOSE;
server.maxidletime = REDIS_MAXIDLETIME;
server.saveparams = NULL;
+ server.loading = 0;
server.logfile = NULL; /* NULL = log on standard output */
+ server.syslog_enabled = 0;
+ server.syslog_ident = zstrdup("redis");
+ server.syslog_facility = LOG_LOCAL0;
server.glueoutputbuf = 1;
server.daemonize = 0;
server.appendonly = 0;
server.rdbcompression = 1;
server.activerehashing = 1;
server.maxclients = 0;
- server.blpop_blocked_clients = 0;
+ server.bpop_blocked_clients = 0;
server.maxmemory = 0;
server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
server.maxmemory_samples = 3;
- server.vm_enabled = 0;
- server.vm_swap_file = zstrdup("/tmp/redis-%p.vm");
- server.vm_page_size = 256; /* 256 bytes per page */
- server.vm_pages = 1024*1024*100; /* 104 millions of pages */
- server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
- server.vm_max_threads = 4;
- server.vm_blocked_clients = 0;
+ server.ds_enabled = 0;
+ server.ds_path = sdsnew("/tmp/redis.ds");
+ server.cache_max_memory = 64LL*1024*1024; /* 64 MB of RAM */
+ server.cache_blocked_clients = 0;
server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
server.masterport = 6379;
server.master = NULL;
server.replstate = REDIS_REPL_NONE;
+ server.repl_serve_stale_data = 1;
/* Double constants initialization */
R_Zero = 0.0;
signal(SIGPIPE, SIG_IGN);
setupSigSegvAction();
- server.mainthread = pthread_self();
- server.devnull = fopen("/dev/null","w");
- if (server.devnull == NULL) {
- redisLog(REDIS_WARNING, "Can't open /dev/null: %s", server.neterr);
- exit(1);
+ if (server.syslog_enabled) {
+ openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
+ server.syslog_facility);
}
+
+ server.mainthread = pthread_self();
server.clients = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
- server.objfreelist = listCreate();
+ server.unblocked_clients = listCreate();
+ server.cache_flush_queue = listCreate();
+ server.cache_flush_delay = 0;
+
createSharedObjects();
server.el = aeCreateEventLoop();
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
- if (server.vm_enabled)
+ if (server.ds_enabled) {
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
+ server.db[j].io_negcache = dictCreate(&setDictType,NULL);
+ }
server.db[j].id = j;
}
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.stat_numcommands = 0;
server.stat_numconnections = 0;
server.stat_expiredkeys = 0;
+ server.stat_evictedkeys = 0;
server.stat_starttime = time(NULL);
server.stat_keyspace_misses = 0;
server.stat_keyspace_hits = 0;
}
}
- if (server.vm_enabled) vmInit();
+ if (server.ds_enabled) dsInit();
}
/* Populates the Redis Command Table starting from the hard coded list
return REDIS_OK;
}
+ /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
+ * we are a slave with a broken link with master. */
+ if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED &&
+ server.repl_serve_stale_data == 0 &&
+ cmd->proc != infoCommand && cmd->proc != slaveofCommand)
+ {
+ addReplyError(c,
+ "link with MASTER is down and slave-serve-stale-data is set to no");
+ return REDIS_OK;
+ }
+
+ /* Loading DB? Return an error if the command is not INFO */
+ if (server.loading && cmd->proc != infoCommand) {
+ addReply(c, shared.loadingerr);
+ return REDIS_OK;
+ }
+
/* Exec the command */
if (c->flags & REDIS_MULTI &&
cmd->proc != execCommand && cmd->proc != discardCommand &&
queueMultiCommand(c,cmd);
addReply(c,shared.queued);
} else {
- if (server.vm_enabled && server.vm_max_threads > 0 &&
- blockClientOnSwappedKeys(c,cmd)) return REDIS_ERR;
+ if (server.ds_enabled && blockClientOnSwappedKeys(c,cmd))
+ return REDIS_ERR;
call(c,cmd);
}
return REDIS_OK;
kill(server.bgsavechildpid,SIGKILL);
rdbRemoveTempFile(server.bgsavechildpid);
}
- if (server.appendonly) {
+ if (server.ds_enabled) {
+ /* FIXME: flush all objects on disk */
+ } else if (server.appendonly) {
/* Append only file: fsync() the AOF and exit */
aof_fsync(server.appendfd);
- if (server.vm_enabled) unlink(server.vm_swap_file);
} else if (server.saveparamslen > 0) {
/* Snapshotting. Perform a SYNC SAVE and exit */
if (rdbSave(server.dbfilename) != REDIS_OK) {
"used_memory_rss:%zu\r\n"
"mem_fragmentation_ratio:%.2f\r\n"
"use_tcmalloc:%d\r\n"
+ "loading:%d\r\n"
+ "aof_enabled:%d\r\n"
"changes_since_last_save:%lld\r\n"
"bgsave_in_progress:%d\r\n"
"last_save_time:%ld\r\n"
"total_connections_received:%lld\r\n"
"total_commands_processed:%lld\r\n"
"expired_keys:%lld\r\n"
+ "evicted_keys:%lld\r\n"
"keyspace_hits:%lld\r\n"
"keyspace_misses:%lld\r\n"
"hash_max_zipmap_entries:%zu\r\n"
"hash_max_zipmap_value:%zu\r\n"
"pubsub_channels:%ld\r\n"
"pubsub_patterns:%u\r\n"
- "vm_enabled:%d\r\n"
+ "ds_enabled:%d\r\n"
"role:%s\r\n"
,REDIS_VERSION,
redisGitSHA1(),
(float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
- server.blpop_blocked_clients,
+ server.bpop_blocked_clients,
zmalloc_used_memory(),
hmem,
zmalloc_get_rss(),
#else
0,
#endif
+ server.loading,
+ server.appendonly,
server.dirty,
server.bgsavechildpid != -1,
server.lastsave,
server.stat_numconnections,
server.stat_numcommands,
server.stat_expiredkeys,
+ server.stat_evictedkeys,
server.stat_keyspace_hits,
server.stat_keyspace_misses,
server.hash_max_zipmap_entries,
server.hash_max_zipmap_value,
dictSize(server.pubsub_channels),
listLength(server.pubsub_patterns),
- server.vm_enabled != 0,
+ server.ds_enabled != 0,
server.masterhost == NULL ? "master" : "slave"
);
if (server.masterhost) {
"master_port:%d\r\n"
"master_link_status:%s\r\n"
"master_last_io_seconds_ago:%d\r\n"
+ "master_sync_in_progress:%d\r\n"
,server.masterhost,
server.masterport,
(server.replstate == REDIS_REPL_CONNECTED) ?
"up" : "down",
- server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
+ server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1,
+ server.replstate == REDIS_REPL_TRANSFER
);
+
+ if (server.replstate == REDIS_REPL_TRANSFER) {
+ info = sdscatprintf(info,
+ "master_sync_left_bytes:%ld\r\n"
+ "master_sync_last_io_seconds_ago:%d\r\n"
+ ,(long)server.repl_transfer_left,
+ (int)(time(NULL)-server.repl_transfer_lastio)
+ );
+ }
}
- if (server.vm_enabled) {
+ if (server.ds_enabled) {
lockThreadedIO();
info = sdscatprintf(info,
- "vm_conf_max_memory:%llu\r\n"
- "vm_conf_page_size:%llu\r\n"
- "vm_conf_pages:%llu\r\n"
- "vm_stats_used_pages:%llu\r\n"
- "vm_stats_swapped_objects:%llu\r\n"
- "vm_stats_swappin_count:%llu\r\n"
- "vm_stats_swappout_count:%llu\r\n"
- "vm_stats_io_newjobs_len:%lu\r\n"
- "vm_stats_io_processing_len:%lu\r\n"
- "vm_stats_io_processed_len:%lu\r\n"
- "vm_stats_io_active_threads:%lu\r\n"
- "vm_stats_blocked_clients:%lu\r\n"
- ,(unsigned long long) server.vm_max_memory,
- (unsigned long long) server.vm_page_size,
- (unsigned long long) server.vm_pages,
- (unsigned long long) server.vm_stats_used_pages,
- (unsigned long long) server.vm_stats_swapped_objects,
- (unsigned long long) server.vm_stats_swapins,
- (unsigned long long) server.vm_stats_swapouts,
- (unsigned long) listLength(server.io_newjobs),
- (unsigned long) listLength(server.io_processing),
- (unsigned long) listLength(server.io_processed),
- (unsigned long) server.io_active_threads,
- (unsigned long) server.vm_blocked_clients
+ "cache_max_memory:%llu\r\n"
+ "cache_blocked_clients:%lu\r\n"
+ ,(unsigned long long) server.cache_max_memory,
+ (unsigned long) server.cache_blocked_clients
);
unlockThreadedIO();
}
+ if (server.loading) {
+ double perc;
+ time_t eta, elapsed;
+ off_t remaining_bytes = server.loading_total_bytes-
+ server.loading_loaded_bytes;
+
+ perc = ((double)server.loading_loaded_bytes /
+ server.loading_total_bytes) * 100;
+
+ elapsed = time(NULL)-server.loading_start_time;
+ if (elapsed == 0) {
+ eta = 1; /* A fake 1 second figure if we don't have enough info */
+ } else {
+ eta = (elapsed*remaining_bytes)/server.loading_loaded_bytes;
+ }
+
+ info = sdscatprintf(info,
+ "loading_start_time:%ld\r\n"
+ "loading_total_bytes:%llu\r\n"
+ "loading_loaded_bytes:%llu\r\n"
+ "loading_loaded_perc:%.2f\r\n"
+ "loading_eta_seconds:%ld\r\n"
+ ,(unsigned long) server.loading_start_time,
+ (unsigned long long) server.loading_total_bytes,
+ (unsigned long long) server.loading_loaded_bytes,
+ perc,
+ eta
+ );
+ }
for (j = 0; j < server.dbnum; j++) {
long long keys, vkeys;
/* ============================ Maxmemory directive ======================== */
-/* Try to free one object form the pre-allocated objects free list.
- * This is useful under low mem conditions as by default we take 1 million
- * free objects allocated. On success REDIS_OK is returned, otherwise
- * REDIS_ERR. */
-int tryFreeOneObjectFromFreelist(void) {
- robj *o;
-
- if (server.vm_enabled) pthread_mutex_lock(&server.obj_freelist_mutex);
- if (listLength(server.objfreelist)) {
- listNode *head = listFirst(server.objfreelist);
- o = listNodeValue(head);
- listDelNode(server.objfreelist,head);
- if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex);
- zfree(o);
- return REDIS_OK;
- } else {
- if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex);
- return REDIS_ERR;
- }
-}
-
/* This function gets called when 'maxmemory' is set on the config file to limit
* the max memory used by the server, and we are out of memory.
* This function will try to, in order:
void freeMemoryIfNeeded(void) {
/* Remove keys accordingly to the active policy as long as we are
* over the memory limit. */
+ if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION) return;
+
while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
int j, k, freed = 0;
- /* Basic strategy -- remove objects from the free list. */
- if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
-
for (j = 0; j < server.dbnum; j++) {
long bestval = 0; /* just to prevent warning */
sds bestkey = NULL;
de = dictGetRandomKey(dict);
thiskey = dictGetEntryKey(de);
+ /* When policy is volatile-lru we need an additonal lookup
+ * to locate the real key, as dict is set to db->expires. */
+ if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
+ de = dictFind(db->dict, thiskey);
o = dictGetEntryVal(de);
thisval = estimateObjectIdleTime(o);
if (bestkey) {
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
dbDelete(db,keyobj);
- server.stat_expiredkeys++;
+ server.stat_evictedkeys++;
decrRefCount(keyobj);
freed++;
}
}
if (!freed) return; /* nothing to free... */
}
-
- while(0) {
- int j, k, freed = 0;
- for (j = 0; j < server.dbnum; j++) {
- int minttl = -1;
- sds minkey = NULL;
- robj *keyobj = NULL;
- struct dictEntry *de;
-
- if (dictSize(server.db[j].expires)) {
- freed = 1;
- /* From a sample of three keys drop the one nearest to
- * the natural expire */
- for (k = 0; k < 3; k++) {
- time_t t;
-
- de = dictGetRandomKey(server.db[j].expires);
- t = (time_t) dictGetEntryVal(de);
- if (minttl == -1 || t < minttl) {
- minkey = dictGetEntryKey(de);
- minttl = t;
- }
- }
- keyobj = createStringObject(minkey,sdslen(minkey));
- dbDelete(server.db+j,keyobj);
- server.stat_expiredkeys++;
- decrRefCount(keyobj);
- }
- }
- if (!freed) return; /* nothing to free... */
- }
}
/* =================================== Main! ================================ */
linuxOvercommitMemoryWarning();
#endif
start = time(NULL);
- if (server.appendonly) {
+ if (server.ds_enabled) {
+ redisLog(REDIS_NOTICE,"DB not loaded (running with disk back end)");
+ } else if (server.appendonly) {
if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
redisLog(REDIS_NOTICE,"DB loaded from append only file: %ld seconds",time(NULL)-start);
} else {