X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/36741b2c818a95e8ef167818271614ee6b1bc414..989a7820ca0cb1b88493797fdecd2e7168558859:/src/redis.c diff --git a/src/redis.c b/src/redis.c index 025394ef..82dade15 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009-2010, Salvatore Sanfilippo + * Copyright (c) 2009-2012, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -110,6 +110,7 @@ struct redisCommand *commandTable; * t: Allow command while a slave has stale data but is not allowed to * server this data. Normally no command is accepted in this condition * but just a few. + * M: Do not automatically propagate the command on MONITOR. */ struct redisCommand redisCommandTable[] = { {"get",getCommand,2,"r",0,NULL,1,1,1,0,0}, @@ -152,7 +153,7 @@ struct redisCommand redisCommandTable[] = { {"sismember",sismemberCommand,3,"r",0,NULL,1,1,1,0,0}, {"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0}, {"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0}, - {"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0}, + {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0}, {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0}, {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0}, {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0}, @@ -216,7 +217,7 @@ struct redisCommand redisCommandTable[] = { {"lastsave",lastsaveCommand,1,"r",0,NULL,0,0,0,0,0}, {"type",typeCommand,2,"r",0,NULL,1,1,1,0,0}, {"multi",multiCommand,1,"rs",0,NULL,0,0,0,0,0}, - {"exec",execCommand,1,"s",0,NULL,0,0,0,0,0}, + {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0}, {"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0}, {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0}, {"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0}, @@ -239,8 +240,8 @@ struct redisCommand redisCommandTable[] = { {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0}, {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0}, {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0}, - {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0}, - {"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0}, + {"restore",restoreCommand,-4,"awm",0,NULL,1,1,1,0,0}, + {"migrate",migrateCommand,-6,"aw",0,NULL,0,0,0,0,0}, {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0}, {"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0}, {"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0}, @@ -560,6 +561,16 @@ dictType clusterNodesDictType = { NULL /* val destructor */ }; +/* Migrate cache dict type. */ +dictType migrateCacheDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + NULL /* val destructor */ +}; + int htNeedsResize(dict *dict) { long long size, used; @@ -914,8 +925,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); - } else { + } else if (pid == server.aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); + } else { + redisLog(REDIS_WARNING, + "Warning, detected child with unmatched pid: %ld", + (long)pid); } updateDictResizePolicy(); } @@ -967,16 +982,21 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * to detect transfer failures. */ run_with_period(1000) replicationCron(); - /* Run other sub-systems specific cron jobs */ + /* Run the Redis Cluster cron. */ run_with_period(1000) { if (server.cluster_enabled) clusterCron(); } - /* Run the sentinel timer if we are in sentinel mode. */ + /* Run the Sentinel timer if we are in sentinel mode. */ run_with_period(100) { if (server.sentinel_mode) sentinelTimer(); } + /* Cleanup expired MIGRATE cached sockets. */ + run_with_period(1000) { + migrateCloseTimedoutSockets(); + } + server.cronloops++; return 1000/REDIS_HZ; } @@ -1027,7 +1047,7 @@ void createSharedObjects(void) { shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n")); shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n")); shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew( - "-ERR Operation against a key holding the wrong kind of value\r\n")); + "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")); shared.nokeyerr = createObject(REDIS_STRING,sdsnew( "-ERR no such key\r\n")); shared.syntaxerr = createObject(REDIS_STRING,sdsnew( @@ -1067,6 +1087,7 @@ void createSharedObjects(void) { shared.del = createStringObject("DEL",3); shared.rpop = createStringObject("RPOP",4); shared.lpop = createStringObject("LPOP",4); + shared.lpush = createStringObject("LPUSH",5); for (j = 0; j < REDIS_SHARED_INTEGERS; j++) { shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j); shared.integers[j]->encoding = REDIS_ENCODING_INT; @@ -1143,6 +1164,7 @@ void initServerConfig() { server.lua_time_limit = REDIS_LUA_TIME_LIMIT; server.lua_client = NULL; server.lua_timedout = 0; + server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); updateLRUClock(); resetServerSaveParams(); @@ -1187,6 +1209,8 @@ void initServerConfig() { server.delCommand = lookupCommandByCString("del"); server.multiCommand = lookupCommandByCString("multi"); server.lpushCommand = lookupCommandByCString("lpush"); + server.lpopCommand = lookupCommandByCString("lpop"); + server.rpopCommand = lookupCommandByCString("rpop"); /* Slow log */ server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN; @@ -1262,6 +1286,7 @@ void initServer() { server.slaves = listCreate(); server.monitors = listCreate(); server.unblocked_clients = listCreate(); + server.ready_keys = listCreate(); createSharedObjects(); adjustOpenFilesLimit(); @@ -1292,6 +1317,7 @@ void initServer() { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); + server.db[j].ready_keys = dictCreate(&setDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; } @@ -1343,11 +1369,11 @@ void initServer() { /* 32 bit instances are limited to 4GB of address space, so if there is * no explicit limit in the user provided configuration we set a limit - * at 3.5GB using maxmemory with 'noeviction' policy'. This saves - * useless crashes of the Redis instance. */ + * at 3 GB using maxmemory with 'noeviction' policy'. This avoids + * useless crashes of the Redis instance for out of memory. */ if (server.arch_bits == 32 && server.maxmemory == 0) { - redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with 'noeviction' policy now."); - server.maxmemory = 3584LL*(1024*1024); /* 3584 MB = 3.5 GB */ + redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now."); + server.maxmemory = 3072LL*(1024*1024); /* 3 GB */ server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION; } @@ -1381,6 +1407,7 @@ void populateCommandTable(void) { case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break; case 'l': c->flags |= REDIS_CMD_LOADING; break; case 't': c->flags |= REDIS_CMD_STALE; break; + case 'M': c->flags |= REDIS_CMD_SKIP_MONITOR; break; default: redisPanic("Unsupported command flag"); break; } f++; @@ -1456,7 +1483,7 @@ struct redisCommand *lookupCommandByCString(char *s) { } /* Propagate the specified command (in the context of the specified database id) - * to AOF, Slaves and Monitors. + * to AOF and Slaves. * * flags are an xor between: * + REDIS_PROPAGATE_NONE (no propagation of command at all) @@ -1486,8 +1513,12 @@ void call(redisClient *c, int flags) { /* Sent the command to clients in MONITOR mode, only if the commands are * not geneated from reading an AOF. */ - if (listLength(server.monitors) && !server.loading) + if (listLength(server.monitors) && + !server.loading && + !(c->cmd->flags & REDIS_CMD_SKIP_MONITOR)) + { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); + } /* Call the command. */ redisOpArrayInit(&server.also_propagate); @@ -1537,7 +1568,7 @@ void call(redisClient *c, int flags) { } /* If this function gets called we already read a whole - * command, argments are in the client argv/argc fields. + * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the * server for a bulk read from the client. * @@ -1662,6 +1693,7 @@ int processCommand(redisClient *c) { /* Lua script too slow? Only allow commands with REDIS_CMD_STALE flag. */ if (server.lua_timedout && + c->cmd->proc != authCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && @@ -1682,6 +1714,8 @@ int processCommand(redisClient *c) { addReply(c,shared.queued); } else { call(c,REDIS_CALL_FULL); + if (listLength(server.ready_keys)) + handleClientsBlockedOnLists(); } return REDIS_OK; } @@ -1989,7 +2023,7 @@ sds genRedisInfoString(char *section) { "aof_base_size:%lld\r\n" "aof_pending_rewrite:%d\r\n" "aof_buffer_length:%zu\r\n" - "aof_rewrite_buffer_length:%zu\r\n" + "aof_rewrite_buffer_length:%lu\r\n" "aof_pending_bio_fsync:%llu\r\n" "aof_delayed_fsync:%lu\r\n", (long long) server.aof_current_size, @@ -2048,7 +2082,8 @@ sds genRedisInfoString(char *section) { "keyspace_misses:%lld\r\n" "pubsub_channels:%ld\r\n" "pubsub_patterns:%lu\r\n" - "latest_fork_usec:%lld\r\n", + "latest_fork_usec:%lld\r\n" + "migrate_cached_sockets:%ld\r\n", server.stat_numconnections, server.stat_numcommands, getOperationsPerSecond(), @@ -2059,7 +2094,8 @@ sds genRedisInfoString(char *section) { server.stat_keyspace_misses, dictSize(server.pubsub_channels), listLength(server.pubsub_patterns), - server.stat_fork_time); + server.stat_fork_time, + dictSize(server.migrate_cached_sockets)); } /* Replication */ @@ -2101,7 +2137,10 @@ sds genRedisInfoString(char *section) { (long)server.unixtime-server.repl_down_since); } info = sdscatprintf(info, - "slave_priority:%d\r\n", server.slave_priority); + "slave_priority:%d\r\n" + "slave_read_only:%d\r\n", + server.slave_priority, + server.repl_slave_ro); } info = sdscatprintf(info, "connected_slaves:%lu\r\n", @@ -2212,7 +2251,7 @@ void infoCommand(redisClient *c) { } void monitorCommand(redisClient *c) { - /* ignore MONITOR if aleady slave or in monitor mode */ + /* ignore MONITOR if already slave or in monitor mode */ if (c->flags & REDIS_SLAVE) return; c->flags |= (REDIS_SLAVE|REDIS_MONITOR); @@ -2311,7 +2350,7 @@ int freeMemoryIfNeeded(void) { de = dictGetRandomKey(dict); thiskey = dictGetKey(de); - /* When policy is volatile-lru we need an additonal lookup + /* When policy is volatile-lru we need an additional lookup * to locate the real key, as dict is set to db->expires. */ if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) de = dictFind(db->dict, thiskey); @@ -2614,7 +2653,7 @@ int main(int argc, char **argv) { redisAsciiArt(); if (!server.sentinel_mode) { - /* Things only needed when not runnign in Sentinel mode. */ + /* Things only needed when not running in Sentinel mode. */ redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION); #ifdef __linux__ linuxOvercommitMemoryWarning(); @@ -2626,6 +2665,11 @@ int main(int argc, char **argv) { redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); } + /* Warning the user about suspicious maxmemory setting. */ + if (server.maxmemory > 0 && server.maxmemory < 1024*1024) { + redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); + } + aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); aeDeleteEventLoop(server.el);