X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/8d3e063a0ac13ad10ad47df15cbb6bbc8116bf2f..f18e059e8270a3b1889aa621fa6565dcf6be5f1c:/src/redis.c?ds=sidebyside diff --git a/src/redis.c b/src/redis.c index 30fe807d..fb6eb469 100644 --- a/src/redis.c +++ b/src/redis.c @@ -74,10 +74,14 @@ struct redisCommand readonlyCommandTable[] = { {"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0}, {"setex",setexCommand,4,REDIS_CMD_DENYOOM,NULL,0,0,0}, {"append",appendCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"substr",substrCommand,4,0,NULL,1,1,1}, {"strlen",strlenCommand,2,0,NULL,1,1,1}, {"del",delCommand,-2,0,NULL,0,0,0}, {"exists",existsCommand,2,0,NULL,1,1,1}, + {"setbit",setbitCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"getbit",getbitCommand,3,0,NULL,1,1,1}, + {"setrange",setrangeCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"getrange",getrangeCommand,4,0,NULL,1,1,1}, + {"substr",getrangeCommand,4,0,NULL,1,1,1}, {"incr",incrCommand,2,REDIS_CMD_DENYOOM,NULL,1,1,1}, {"decr",decrCommand,2,REDIS_CMD_DENYOOM,NULL,1,1,1}, {"mget",mgetCommand,-2,0,NULL,1,-1,1}, @@ -89,6 +93,7 @@ struct redisCommand readonlyCommandTable[] = { {"rpop",rpopCommand,2,0,NULL,1,1,1}, {"lpop",lpopCommand,2,0,NULL,1,1,1}, {"brpop",brpopCommand,-3,0,NULL,1,1,1}, + {"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1}, {"blpop",blpopCommand,-3,0,NULL,1,1,1}, {"llen",llenCommand,2,0,NULL,1,1,1}, {"lindex",lindexCommand,3,0,NULL,1,1,1}, @@ -96,7 +101,7 @@ struct redisCommand readonlyCommandTable[] = { {"lrange",lrangeCommand,4,0,NULL,1,1,1}, {"ltrim",ltrimCommand,4,0,NULL,1,1,1}, {"lrem",lremCommand,4,0,NULL,1,1,1}, - {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1}, + {"rpoplpush",rpoplpushCommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1}, {"sadd",saddCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, {"srem",sremCommand,3,0,NULL,1,1,1}, {"smove",smoveCommand,4,0,NULL,1,2,1}, @@ -124,7 +129,7 @@ struct redisCommand readonlyCommandTable[] = { {"zcount",zcountCommand,4,0,NULL,1,1,1}, {"zrevrange",zrevrangeCommand,-4,0,NULL,1,1,1}, {"zcard",zcardCommand,2,0,NULL,1,1,1}, - {"zscore",zscoreCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"zscore",zscoreCommand,3,0,NULL,1,1,1}, {"zrank",zrankCommand,3,0,NULL,1,1,1}, {"zrevrank",zrevrankCommand,3,0,NULL,1,1,1}, {"hset",hsetCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1}, @@ -188,11 +193,13 @@ struct redisCommand readonlyCommandTable[] = { /*============================ Utility functions ============================ */ void redisLog(int level, const char *fmt, ...) { + const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING }; + const char *c = ".-*#"; + time_t now = time(NULL); va_list ap; FILE *fp; - char *c = ".-*#"; char buf[64]; - time_t now; + char msg[REDIS_MAX_LOGMSG_LEN]; if (level < server.verbosity) return; @@ -200,15 +207,16 @@ void redisLog(int level, const char *fmt, ...) { if (!fp) return; va_start(ap, fmt); - now = time(NULL); - strftime(buf,64,"%d %b %H:%M:%S",localtime(&now)); - fprintf(fp,"[%d] %s %c ",(int)getpid(),buf,c[level]); - vfprintf(fp, fmt, ap); - fprintf(fp,"\n"); - fflush(fp); + vsnprintf(msg, sizeof(msg), fmt, ap); va_end(ap); + strftime(buf,sizeof(buf),"%d %b %H:%M:%S",localtime(&now)); + fprintf(fp,"[%d] %s %c %s\n",(int)getpid(),buf,c[level],msg); + fflush(fp); + if (server.logfile) fclose(fp); + + if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg); } /* Redis generally does not try to recover from out of memory conditions @@ -572,7 +580,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } /* Close connections of timedout clients */ - if ((server.maxidletime && !(loops % 100)) || server.blpop_blocked_clients) + if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients) closeTimedoutClients(); /* Check if a background saving or AOF rewrite in progress terminated */ @@ -616,10 +624,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { while (server.vm_enabled && zmalloc_used_memory() > server.vm_max_memory) { - int retval; - - if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue; - retval = (server.vm_max_threads == 0) ? + int retval = (server.vm_max_threads == 0) ? vmSwapOneObjectBlocking() : vmSwapOneObjectThreaded(); if (retval == REDIS_ERR && !(loops % 300) && @@ -636,14 +641,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } } - /* Check if we should connect to a MASTER */ - if (server.replstate == REDIS_REPL_CONNECT && !(loops % 10)) { - redisLog(REDIS_NOTICE,"Connecting to MASTER..."); - if (syncWithMaster() == REDIS_OK) { - redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded"); - if (server.appendonly) rewriteAppendOnlyFileBackground(); - } - } + /* Replication cron function -- used to reconnect to master and + * to detect transfer failures. */ + if (!(loops % 10)) replicationCron(); + return 100; } @@ -652,15 +653,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); + listNode *ln; + redisClient *c; /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; - listNode *ln; listRewind(server.io_ready_clients,&li); while((ln = listNext(&li))) { - redisClient *c = ln->value; + c = ln->value; struct redisCommand *cmd; /* Resume the client. */ @@ -678,6 +680,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) { processInputBuffer(c); } } + + /* Try to process pending commands for clients that were just unblocked. */ + while (listLength(server.unblocked_clients)) { + ln = listFirst(server.unblocked_clients); + redisAssert(ln != NULL); + c = ln->value; + listDelNode(server.unblocked_clients,ln); + + /* Process remaining data in the input buffer. */ + if (c->querybuf && sdslen(c->querybuf) > 0) + processInputBuffer(c); + } + /* Write the AOF buffer on disk */ flushAppendOnlyFile(); } @@ -709,6 +724,8 @@ void createSharedObjects(void) { "-ERR source and destination objects are the same\r\n")); shared.outofrangeerr = createObject(REDIS_STRING,sdsnew( "-ERR index out of range\r\n")); + shared.loadingerr = createObject(REDIS_STRING,sdsnew( + "-LOADING Redis is loading the dataset in memory\r\n")); shared.space = createObject(REDIS_STRING,sdsnew(" ")); shared.colon = createObject(REDIS_STRING,sdsnew(":")); shared.plus = createObject(REDIS_STRING,sdsnew("+")); @@ -746,7 +763,11 @@ void initServerConfig() { server.verbosity = REDIS_VERBOSE; server.maxidletime = REDIS_MAXIDLETIME; server.saveparams = NULL; + server.loading = 0; server.logfile = NULL; /* NULL = log on standard output */ + server.syslog_enabled = 0; + server.syslog_ident = zstrdup("redis"); + server.syslog_facility = LOG_LOCAL0; server.glueoutputbuf = 1; server.daemonize = 0; server.appendonly = 0; @@ -762,7 +783,7 @@ void initServerConfig() { server.rdbcompression = 1; server.activerehashing = 1; server.maxclients = 0; - server.blpop_blocked_clients = 0; + server.bpop_blocked_clients = 0; server.maxmemory = 0; server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU; server.maxmemory_samples = 3; @@ -793,6 +814,7 @@ void initServerConfig() { server.masterport = 6379; server.master = NULL; server.replstate = REDIS_REPL_NONE; + server.repl_serve_stale_data = 1; /* Double constants initialization */ R_Zero = 0.0; @@ -816,16 +838,16 @@ void initServer() { signal(SIGPIPE, SIG_IGN); setupSigSegvAction(); - server.mainthread = pthread_self(); - server.devnull = fopen("/dev/null","w"); - if (server.devnull == NULL) { - redisLog(REDIS_WARNING, "Can't open /dev/null: %s", server.neterr); - exit(1); + if (server.syslog_enabled) { + openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, + server.syslog_facility); } + + server.mainthread = pthread_self(); server.clients = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); - server.objfreelist = listCreate(); + server.unblocked_clients = listCreate(); createSharedObjects(); server.el = aeCreateEventLoop(); server.db = zmalloc(sizeof(redisDb)*server.dbnum); @@ -1002,6 +1024,23 @@ int processCommand(redisClient *c) { return REDIS_OK; } + /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and + * we are a slave with a broken link with master. */ + if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED && + server.repl_serve_stale_data == 0 && + cmd->proc != infoCommand && cmd->proc != slaveofCommand) + { + addReplyError(c, + "link with MASTER is down and slave-serve-stale-data is set to no"); + return REDIS_OK; + } + + /* Loading DB? Return an error if the command is not INFO */ + if (server.loading && cmd->proc != infoCommand) { + addReply(c, shared.loadingerr); + return REDIS_OK; + } + /* Exec the command */ if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand && @@ -1129,6 +1168,8 @@ sds genRedisInfoString(void) { "used_memory_rss:%zu\r\n" "mem_fragmentation_ratio:%.2f\r\n" "use_tcmalloc:%d\r\n" + "loading:%d\r\n" + "aof_enabled:%d\r\n" "changes_since_last_save:%lld\r\n" "bgsave_in_progress:%d\r\n" "last_save_time:%ld\r\n" @@ -1159,7 +1200,7 @@ sds genRedisInfoString(void) { (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000, listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), - server.blpop_blocked_clients, + server.bpop_blocked_clients, zmalloc_used_memory(), hmem, zmalloc_get_rss(), @@ -1169,6 +1210,8 @@ sds genRedisInfoString(void) { #else 0, #endif + server.loading, + server.appendonly, server.dirty, server.bgsavechildpid != -1, server.lastsave, @@ -1191,12 +1234,23 @@ sds genRedisInfoString(void) { "master_port:%d\r\n" "master_link_status:%s\r\n" "master_last_io_seconds_ago:%d\r\n" + "master_sync_in_progress:%d\r\n" ,server.masterhost, server.masterport, (server.replstate == REDIS_REPL_CONNECTED) ? "up" : "down", - server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1 + server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1, + server.replstate == REDIS_REPL_TRANSFER ); + + if (server.replstate == REDIS_REPL_TRANSFER) { + info = sdscatprintf(info, + "master_sync_left_bytes:%ld\r\n" + "master_sync_last_io_seconds_ago:%d\r\n" + ,(long)server.repl_transfer_left, + (int)(time(NULL)-server.repl_transfer_lastio) + ); + } } if (server.vm_enabled) { lockThreadedIO(); @@ -1228,6 +1282,35 @@ sds genRedisInfoString(void) { ); unlockThreadedIO(); } + if (server.loading) { + double perc; + time_t eta, elapsed; + off_t remaining_bytes = server.loading_total_bytes- + server.loading_loaded_bytes; + + perc = ((double)server.loading_loaded_bytes / + server.loading_total_bytes) * 100; + + elapsed = time(NULL)-server.loading_start_time; + if (elapsed == 0) { + eta = 1; /* A fake 1 second figure if we don't have enough info */ + } else { + eta = (elapsed*remaining_bytes)/server.loading_loaded_bytes; + } + + info = sdscatprintf(info, + "loading_start_time:%ld\r\n" + "loading_total_bytes:%llu\r\n" + "loading_loaded_bytes:%llu\r\n" + "loading_loaded_perc:%.2f\r\n" + "loading_eta_seconds:%ld\r\n" + ,(unsigned long) server.loading_start_time, + (unsigned long long) server.loading_total_bytes, + (unsigned long long) server.loading_loaded_bytes, + perc, + eta + ); + } for (j = 0; j < server.dbnum; j++) { long long keys, vkeys; @@ -1261,27 +1344,6 @@ void monitorCommand(redisClient *c) { /* ============================ Maxmemory directive ======================== */ -/* Try to free one object form the pre-allocated objects free list. - * This is useful under low mem conditions as by default we take 1 million - * free objects allocated. On success REDIS_OK is returned, otherwise - * REDIS_ERR. */ -int tryFreeOneObjectFromFreelist(void) { - robj *o; - - if (server.vm_enabled) pthread_mutex_lock(&server.obj_freelist_mutex); - if (listLength(server.objfreelist)) { - listNode *head = listFirst(server.objfreelist); - o = listNodeValue(head); - listDelNode(server.objfreelist,head); - if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex); - zfree(o); - return REDIS_OK; - } else { - if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex); - return REDIS_ERR; - } -} - /* This function gets called when 'maxmemory' is set on the config file to limit * the max memory used by the server, and we are out of memory. * This function will try to, in order: @@ -1296,12 +1358,11 @@ int tryFreeOneObjectFromFreelist(void) { void freeMemoryIfNeeded(void) { /* Remove keys accordingly to the active policy as long as we are * over the memory limit. */ + if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION) return; + while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) { int j, k, freed = 0; - /* Basic strategy -- remove objects from the free list. */ - if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue; - for (j = 0; j < server.dbnum; j++) { long bestval = 0; /* just to prevent warning */ sds bestkey = NULL; @@ -1337,6 +1398,10 @@ void freeMemoryIfNeeded(void) { de = dictGetRandomKey(dict); thiskey = dictGetEntryKey(de); + /* When policy is volatile-lru we need an additonal lookup + * to locate the real key, as dict is set to db->expires. */ + if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) + de = dictFind(db->dict, thiskey); o = dictGetEntryVal(de); thisval = estimateObjectIdleTime(o);