X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/b72f6a4b70ef642a085f700243ebf885ca7b09f4..7c775e09433249e9d64c24ff92d2e5b716f5eb2d:/redis.c diff --git a/redis.c b/redis.c index 21c547c0..a961c247 100644 --- a/redis.c +++ b/redis.c @@ -172,13 +172,12 @@ #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 /* Client flags */ -#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */ -#define REDIS_SLAVE 2 /* This client is a slave server */ -#define REDIS_MASTER 4 /* This client is a master server */ -#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */ -#define REDIS_MULTI 16 /* This client is in a MULTI context */ -#define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */ -#define REDIS_IO_WAIT 64 /* The client is waiting for Virtual Memory I/O */ +#define REDIS_SLAVE 1 /* This client is a slave server */ +#define REDIS_MASTER 2 /* This client is a master server */ +#define REDIS_MONITOR 4 /* This client is a slave monitor, see MONITOR */ +#define REDIS_MULTI 8 /* This client is in a MULTI context */ +#define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */ +#define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ /* Slave replication state - slave side */ #define REDIS_REPL_NONE 0 /* No active replication */ @@ -222,7 +221,7 @@ #define APPENDFSYNC_EVERYSEC 2 /* We can print the stacktrace, so our assert is defined this way: */ -#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),exit(1))) +#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) static void _redisAssert(char *estr, char *file, int line); /*================================= Data types ============================== */ @@ -269,6 +268,7 @@ typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */ + dict *io_keys; /* Keys with clients waiting for VM I/O */ int id; } redisDb; @@ -298,8 +298,7 @@ typedef struct redisClient { list *reply; int sentlen; time_t lastinteraction; /* time of the last interaction, used for timeout */ - int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */ - /* REDIS_MULTI */ + int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */ int slaveseldb; /* slave selected db, if this client is a slave */ int authenticated; /* when requirepass is non-NULL */ int replstate; /* replication state if this is a slave */ @@ -307,7 +306,7 @@ typedef struct redisClient { long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - robj **blockingkeys; /* The key we waiting to terminate a blocking + robj **blockingkeys; /* The key we are waiting to terminate a blocking * operation such as BLPOP. Otherwise NULL. */ int blockingkeysnum; /* Number of blocking keys */ time_t blockingto; /* Blocking operation timeout. If UNIX current time @@ -373,7 +372,8 @@ struct redisServer { int replstate; unsigned int maxclients; unsigned long long maxmemory; - unsigned int blockedclients; + unsigned int blpop_blocked_clients; + unsigned int vm_blocked_clients; /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -399,7 +399,7 @@ struct redisServer { list *io_newjobs; /* List of VM I/O jobs yet to be processed */ list *io_processing; /* List of VM I/O jobs being processed */ list *io_processed; /* List of VM I/O jobs already processed */ - list *io_clients; /* All the clients waiting for SWAP I/O operations */ + list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */ pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */ pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */ pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */ @@ -426,6 +426,10 @@ struct redisCommand { redisCommandProc *proc; int arity; int flags; + /* What keys should be loaded in background when calling this command? */ + int vm_firstkey; /* The first argument that's a key (0 = no keys) */ + int vm_lastkey; /* THe last argument that's a key */ + int vm_keystep; /* The step between first and last key */ }; struct redisFunctionSym { @@ -487,7 +491,7 @@ static double R_Zero, R_PosInf, R_NegInf, R_Nan; #define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */ #define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */ #define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */ -typedef struct iojon { +typedef struct iojob { int type; /* Request type, REDIS_IOJOB_* */ redisDb *db;/* Redis database */ robj *key; /* This I/O request is about swapping this key */ @@ -565,6 +569,13 @@ static robj *vmReadObjectFromSwap(off_t page, int type); static void waitEmptyIOJobsQueue(void); static void vmReopenSwapFile(void); static int vmFreePage(off_t page); +static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c); +static int dontWaitForSwappedKey(redisClient *c, robj *key); +static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); +static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); +static struct redisCommand *lookupCommand(char *name); +static void call(redisClient *c, struct redisCommand *cmd); +static void resetClient(redisClient *c); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); @@ -634,6 +645,7 @@ static void zaddCommand(redisClient *c); static void zincrbyCommand(redisClient *c); static void zrangeCommand(redisClient *c); static void zrangebyscoreCommand(redisClient *c); +static void zcountCommand(redisClient *c); static void zrevrangeCommand(redisClient *c); static void zcardCommand(redisClient *c); static void zremCommand(redisClient *c); @@ -643,91 +655,94 @@ static void multiCommand(redisClient *c); static void execCommand(redisClient *c); static void blpopCommand(redisClient *c); static void brpopCommand(redisClient *c); +static void appendCommand(redisClient *c); /*================================= Globals ================================= */ /* Global vars */ static struct redisServer server; /* server global state */ static struct redisCommand cmdTable[] = { - {"get",getCommand,2,REDIS_CMD_INLINE}, - {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, - {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, - {"del",delCommand,-2,REDIS_CMD_INLINE}, - {"exists",existsCommand,2,REDIS_CMD_INLINE}, - {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, - {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, - {"mget",mgetCommand,-2,REDIS_CMD_INLINE}, - {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, - {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, - {"rpop",rpopCommand,2,REDIS_CMD_INLINE}, - {"lpop",lpopCommand,2,REDIS_CMD_INLINE}, - {"brpop",brpopCommand,-3,REDIS_CMD_INLINE}, - {"blpop",blpopCommand,-3,REDIS_CMD_INLINE}, - {"llen",llenCommand,2,REDIS_CMD_INLINE}, - {"lindex",lindexCommand,3,REDIS_CMD_INLINE}, - {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, - {"lrange",lrangeCommand,4,REDIS_CMD_INLINE}, - {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE}, - {"lrem",lremCommand,4,REDIS_CMD_BULK}, - {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, - {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, - {"srem",sremCommand,3,REDIS_CMD_BULK}, - {"smove",smoveCommand,4,REDIS_CMD_BULK}, - {"sismember",sismemberCommand,3,REDIS_CMD_BULK}, - {"scard",scardCommand,2,REDIS_CMD_INLINE}, - {"spop",spopCommand,2,REDIS_CMD_INLINE}, - {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE}, - {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, - {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, - {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, - {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, - {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, - {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, - {"smembers",sinterCommand,2,REDIS_CMD_INLINE}, - {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, - {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, - {"zrem",zremCommand,3,REDIS_CMD_BULK}, - {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE}, - {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE}, - {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE}, - {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE}, - {"zcard",zcardCommand,2,REDIS_CMD_INLINE}, - {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, - {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, - {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, - {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, - {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, - {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, - {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE}, - {"select",selectCommand,2,REDIS_CMD_INLINE}, - {"move",moveCommand,3,REDIS_CMD_INLINE}, - {"rename",renameCommand,3,REDIS_CMD_INLINE}, - {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE}, - {"expire",expireCommand,3,REDIS_CMD_INLINE}, - {"expireat",expireatCommand,3,REDIS_CMD_INLINE}, - {"keys",keysCommand,2,REDIS_CMD_INLINE}, - {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE}, - {"auth",authCommand,2,REDIS_CMD_INLINE}, - {"ping",pingCommand,1,REDIS_CMD_INLINE}, - {"echo",echoCommand,2,REDIS_CMD_BULK}, - {"save",saveCommand,1,REDIS_CMD_INLINE}, - {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE}, - {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE}, - {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE}, - {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE}, - {"type",typeCommand,2,REDIS_CMD_INLINE}, - {"multi",multiCommand,1,REDIS_CMD_INLINE}, - {"exec",execCommand,1,REDIS_CMD_INLINE}, - {"sync",syncCommand,1,REDIS_CMD_INLINE}, - {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE}, - {"flushall",flushallCommand,1,REDIS_CMD_INLINE}, - {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, - {"info",infoCommand,1,REDIS_CMD_INLINE}, - {"monitor",monitorCommand,1,REDIS_CMD_INLINE}, - {"ttl",ttlCommand,2,REDIS_CMD_INLINE}, - {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE}, - {"debug",debugCommand,-2,REDIS_CMD_INLINE}, - {NULL,NULL,0,0} + {"get",getCommand,2,REDIS_CMD_INLINE,1,1,1}, + {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,0,0,0}, + {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,0,0,0}, + {"append",appendCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1}, + {"del",delCommand,-2,REDIS_CMD_INLINE,0,0,0}, + {"exists",existsCommand,2,REDIS_CMD_INLINE,1,1,1}, + {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1}, + {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1}, + {"mget",mgetCommand,-2,REDIS_CMD_INLINE,1,-1,1}, + {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1}, + {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1}, + {"rpop",rpopCommand,2,REDIS_CMD_INLINE,1,1,1}, + {"lpop",lpopCommand,2,REDIS_CMD_INLINE,1,1,1}, + {"brpop",brpopCommand,-3,REDIS_CMD_INLINE,1,1,1}, + {"blpop",blpopCommand,-3,REDIS_CMD_INLINE,1,1,1}, + {"llen",llenCommand,2,REDIS_CMD_INLINE,1,1,1}, + {"lindex",lindexCommand,3,REDIS_CMD_INLINE,1,1,1}, + {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1}, + {"lrange",lrangeCommand,4,REDIS_CMD_INLINE,1,1,1}, + {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE,1,1,1}, + {"lrem",lremCommand,4,REDIS_CMD_BULK,1,1,1}, + {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,2,1}, + {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1}, + {"srem",sremCommand,3,REDIS_CMD_BULK,1,1,1}, + {"smove",smoveCommand,4,REDIS_CMD_BULK,1,2,1}, + {"sismember",sismemberCommand,3,REDIS_CMD_BULK,1,1,1}, + {"scard",scardCommand,2,REDIS_CMD_INLINE,1,1,1}, + {"spop",spopCommand,2,REDIS_CMD_INLINE,1,1,1}, + {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE,1,1,1}, + {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,-1,1}, + {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,2,-1,1}, + {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,-1,1}, + {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,2,-1,1}, + {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,-1,1}, + {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,2,-1,1}, + {"smembers",sinterCommand,2,REDIS_CMD_INLINE,1,1,1}, + {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1}, + {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1}, + {"zrem",zremCommand,3,REDIS_CMD_BULK,1,1,1}, + {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE,1,1,1}, + {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,1,1,1}, + {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,1,1,1}, + {"zcount",zcountCommand,4,REDIS_CMD_INLINE,1,1,1}, + {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE,1,1,1}, + {"zcard",zcardCommand,2,REDIS_CMD_INLINE,1,1,1}, + {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1}, + {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1}, + {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1}, + {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1}, + {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,-1,2}, + {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,-1,2}, + {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"select",selectCommand,2,REDIS_CMD_INLINE,0,0,0}, + {"move",moveCommand,3,REDIS_CMD_INLINE,1,1,1}, + {"rename",renameCommand,3,REDIS_CMD_INLINE,1,1,1}, + {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE,1,1,1}, + {"expire",expireCommand,3,REDIS_CMD_INLINE,0,0,0}, + {"expireat",expireatCommand,3,REDIS_CMD_INLINE,0,0,0}, + {"keys",keysCommand,2,REDIS_CMD_INLINE,0,0,0}, + {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"auth",authCommand,2,REDIS_CMD_INLINE,0,0,0}, + {"ping",pingCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"echo",echoCommand,2,REDIS_CMD_BULK,0,0,0}, + {"save",saveCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"type",typeCommand,2,REDIS_CMD_INLINE,1,1,1}, + {"multi",multiCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"exec",execCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"sync",syncCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"flushall",flushallCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1}, + {"info",infoCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"monitor",monitorCommand,1,REDIS_CMD_INLINE,0,0,0}, + {"ttl",ttlCommand,2,REDIS_CMD_INLINE,1,1,1}, + {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,0,0,0}, + {"debug",debugCommand,-2,REDIS_CMD_INLINE,0,0,0}, + {NULL,NULL,0,0,0,0,0} }; /*============================ Utility functions ============================ */ @@ -864,7 +879,7 @@ static void redisLog(int level, const char *fmt, ...) { va_start(ap, fmt); if (level >= server.verbosity) { - char *c = ".-*"; + char *c = ".-*#"; char buf[64]; time_t now; @@ -947,10 +962,24 @@ static int dictEncObjKeyCompare(void *privdata, const void *key1, static unsigned int dictEncObjHash(const void *key) { robj *o = (robj*) key; - o = getDecodedObject(o); - unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); - decrRefCount(o); - return hash; + if (o->encoding == REDIS_ENCODING_RAW) { + return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); + } else { + if (o->encoding == REDIS_ENCODING_INT) { + char buf[32]; + int len; + + len = snprintf(buf,32,"%ld",(long)o->ptr); + return dictGenHashFunction((unsigned char*)buf, len); + } else { + unsigned int hash; + + o = getDecodedObject(o); + hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); + decrRefCount(o); + return hash; + } + } } /* Sets type and expires */ @@ -994,7 +1023,8 @@ static dictType keyptrDictType = { }; /* Keylist hash table type has unencoded redis objects as keys and - * lists as values. It's used for blocking operations (BLPOP) */ + * lists as values. It's used for blocking operations (BLPOP) and to + * map swapped keys to a list of clients waiting for this keys to be loaded. */ static dictType keylistDictType = { dictObjHash, /* hash function */ NULL, /* key dup */ @@ -1195,7 +1225,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD } /* Close connections of timedout clients */ - if ((server.maxidletime && !(loops % 10)) || server.blockedclients) + if ((server.maxidletime && !(loops % 10)) || server.blpop_blocked_clients) closeTimedoutClients(); /* Check if a background saving or AOF rewrite in progress terminated */ @@ -1294,6 +1324,38 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD return 1000; } +/* This function gets called every time Redis is entering the + * main loop of the event driven library, that is, before to sleep + * for ready file descriptors. */ +static void beforeSleep(struct aeEventLoop *eventLoop) { + REDIS_NOTUSED(eventLoop); + + if (server.vm_enabled && listLength(server.io_ready_clients)) { + listIter li; + listNode *ln; + + listRewind(server.io_ready_clients,&li); + while((ln = listNext(&li))) { + redisClient *c = ln->value; + struct redisCommand *cmd; + + /* Resume the client. */ + listDelNode(server.io_ready_clients,ln); + c->flags &= (~REDIS_IO_WAIT); + server.vm_blocked_clients--; + aeCreateFileEvent(server.el, c->fd, AE_READABLE, + readQueryFromClient, c); + cmd = lookupCommand(c->argv[0]->ptr); + assert(cmd != NULL); + call(c,cmd); + resetClient(c); + /* There may be more data to process in the input buffer. */ + if (c->querybuf && sdslen(c->querybuf) > 0) + processInputBuffer(c); + } + } +} + static void createSharedObjects(void) { shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n")); shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n")); @@ -1367,7 +1429,7 @@ static void initServerConfig() { server.rdbcompression = 1; server.sharingpoolsize = 1024; server.maxclients = 0; - server.blockedclients = 0; + server.blpop_blocked_clients = 0; server.maxmemory = 0; server.vm_enabled = 0; server.vm_swap_file = zstrdup("/tmp/redis-%p.vm"); @@ -1375,6 +1437,7 @@ static void initServerConfig() { server.vm_pages = 1024*1024*100; /* 104 millions of pages */ server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */ server.vm_max_threads = 4; + server.vm_blocked_clients = 0; resetServerSaveParams(); @@ -1425,6 +1488,8 @@ static void initServer() { server.db[j].dict = dictCreate(&hashDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL); + if (server.vm_enabled) + server.db[j].io_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; } server.cronloops = 0; @@ -1685,11 +1750,17 @@ static void freeClient(redisClient *c) { ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); - /* Remove from the list of clients waiting for VM operations */ - if (server.vm_enabled && listLength(c->io_keys)) { - ln = listSearchKey(server.io_clients,c); - if (ln) listDelNode(server.io_clients,ln); - listRelease(c->io_keys); + /* Remove from the list of clients waiting for swapped keys */ + if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) { + ln = listSearchKey(server.io_ready_clients,c); + if (ln) { + listDelNode(server.io_ready_clients,ln); + server.vm_blocked_clients--; + } + } + while (server.vm_enabled && listLength(c->io_keys)) { + ln = listFirst(c->io_keys); + dontWaitForSwappedKey(c,ln->value); } listRelease(c->io_keys); /* Other cleanup */ @@ -2002,6 +2073,9 @@ static int processCommand(redisClient *c) { freeClient(c); return 0; } + + /* Now lookup the command and check ASAP about trivial error conditions + * such wrong arity, bad command name and so forth. */ cmd = lookupCommand(c->argv[0]->ptr); if (!cmd) { addReplySds(c, @@ -2022,6 +2096,7 @@ static int processCommand(redisClient *c) { resetClient(c); return 1; } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { + /* This is a bulk command, we have to read the last argument yet. */ int bulklen = atoi(c->argv[c->argc-1]->ptr); decrRefCount(c->argv[c->argc-1]); @@ -2043,6 +2118,8 @@ static int processCommand(redisClient *c) { c->argc++; c->querybuf = sdsrange(c->querybuf,c->bulklen,-1); } else { + /* Otherwise return... there is to read the last argument + * from the socket. */ return 1; } } @@ -2068,14 +2145,12 @@ static int processCommand(redisClient *c) { queueMultiCommand(c,cmd); addReply(c,shared.queued); } else { + if (server.vm_enabled && server.vm_max_threads > 0 && + blockClientOnSwappedKeys(cmd,c)) return 1; call(c,cmd); } /* Prepare the client for the next command */ - if (c->flags & REDIS_CLOSE) { - freeClient(c); - return 0; - } resetClient(c); return 1; } @@ -2339,6 +2414,14 @@ static void addReplyDouble(redisClient *c, double d) { (unsigned long) strlen(buf),buf)); } +static void addReplyLong(redisClient *c, long l) { + char buf[128]; + size_t len; + + len = snprintf(buf,sizeof(buf),":%ld\r\n",l); + addReplySds(c,sdsnewlen(buf,len)); +} + static void addReplyBulkLen(redisClient *c, robj *obj) { size_t len; @@ -2550,10 +2633,16 @@ static robj *lookupKey(redisDb *db, robj *key) { /* Update the access time of the key for the aging algorithm. */ key->vm.atime = server.unixtime; } else { + int notify = (key->storage == REDIS_VM_LOADING); + /* Our value was swapped on disk. Bring it at home. */ redisAssert(val == NULL); val = vmLoadObject(key); dictGetEntryVal(de) = val; + + /* Clients blocked by the VM subsystem may be waiting for + * this key... */ + if (notify) handleClientsBlockedOnSwappedKey(db,key); } } return val; @@ -3104,9 +3193,9 @@ static int rdbSaveBackground(char *filename) { if (server.vm_enabled) vmReopenSwapFile(); close(server.fd); if (rdbSave(filename) == REDIS_OK) { - exit(0); + _exit(0); } else { - exit(1); + _exit(1); } } else { /* Parent */ @@ -3276,6 +3365,10 @@ static robj *rdbLoadObject(int type, FILE *fp) { if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; o = (type == REDIS_LIST) ? createListObject() : createSetObject(); + /* It's faster to expand the dict to the right size asap in order + * to avoid rehashing */ + if (type == REDIS_SET && listlen > DICT_HT_INITIAL_SIZE) + dictExpand(o->ptr,listlen); /* Load every single element of the list/set */ while(listlen--) { robj *ele; @@ -3618,6 +3711,52 @@ static void decrbyCommand(redisClient *c) { incrDecrCommand(c,-incr); } +static void appendCommand(redisClient *c) { + int retval; + size_t totlen; + robj *o; + + o = lookupKeyWrite(c->db,c->argv[1]); + if (o == NULL) { + /* Create the key */ + retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); + incrRefCount(c->argv[1]); + incrRefCount(c->argv[2]); + totlen = stringObjectLen(c->argv[2]); + } else { + dictEntry *de; + + de = dictFind(c->db->dict,c->argv[1]); + assert(de != NULL); + + o = dictGetEntryVal(de); + if (o->type != REDIS_STRING) { + addReply(c,shared.wrongtypeerr); + return; + } + /* If the object is specially encoded or shared we have to make + * a copy */ + if (o->refcount != 1 || o->encoding != REDIS_ENCODING_RAW) { + robj *decoded = getDecodedObject(o); + + o = createStringObject(decoded->ptr, sdslen(decoded->ptr)); + decrRefCount(decoded); + dictReplace(c->db->dict,c->argv[1],o); + } + /* APPEND! */ + if (c->argv[2]->encoding == REDIS_ENCODING_RAW) { + o->ptr = sdscatlen(o->ptr, + c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); + } else { + o->ptr = sdscatprintf(o->ptr, "%ld", + (unsigned long) c->argv[2]->ptr); + } + totlen = sdslen(o->ptr); + } + server.dirty++; + addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",(unsigned long)totlen)); +} + /* ========================= Type agnostic commands ========================= */ static void delCommand(redisClient *c) { @@ -5067,28 +5206,64 @@ static void zrevrangeCommand(redisClient *c) { zrangeGenericCommand(c,1); } -static void zrangebyscoreCommand(redisClient *c) { +/* This command implements both ZRANGEBYSCORE and ZCOUNT. + * If justcount is non-zero, just the count is returned. */ +static void genericZrangebyscoreCommand(redisClient *c, int justcount) { robj *o; - double min = strtod(c->argv[2]->ptr,NULL); - double max = strtod(c->argv[3]->ptr,NULL); + double min, max; + int minex = 0, maxex = 0; /* are min or max exclusive? */ int offset = 0, limit = -1; + int withscores = 0; + int badsyntax = 0; + + /* Parse the min-max interval. If one of the values is prefixed + * by the "(" character, it's considered "open". For instance + * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max + * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */ + if (((char*)c->argv[2]->ptr)[0] == '(') { + min = strtod((char*)c->argv[2]->ptr+1,NULL); + minex = 1; + } else { + min = strtod(c->argv[2]->ptr,NULL); + } + if (((char*)c->argv[3]->ptr)[0] == '(') { + max = strtod((char*)c->argv[3]->ptr+1,NULL); + maxex = 1; + } else { + max = strtod(c->argv[3]->ptr,NULL); + } - if (c->argc != 4 && c->argc != 7) { + /* Parse "WITHSCORES": note that if the command was called with + * the name ZCOUNT then we are sure that c->argc == 4, so we'll never + * enter the following paths to parse WITHSCORES and LIMIT. */ + if (c->argc == 5 || c->argc == 8) { + if (strcasecmp(c->argv[c->argc-1]->ptr,"withscores") == 0) + withscores = 1; + else + badsyntax = 1; + } + if (c->argc != (4 + withscores) && c->argc != (7 + withscores)) + badsyntax = 1; + if (badsyntax) { addReplySds(c, sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n")); return; - } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) { + } + + /* Parse "LIMIT" */ + if (c->argc == (7 + withscores) && strcasecmp(c->argv[4]->ptr,"limit")) { addReply(c,shared.syntaxerr); return; - } else if (c->argc == 7) { + } else if (c->argc == (7 + withscores)) { offset = atoi(c->argv[5]->ptr); limit = atoi(c->argv[6]->ptr); if (offset < 0) offset = 0; } + /* Ok, lookup the key and get the range */ o = lookupKeyRead(c->db,c->argv[1]); if (o == NULL) { - addReply(c,shared.nullmultibulk); + addReply(c,justcount ? shared.czero : shared.nullmultibulk); } else { if (o->type != REDIS_ZSET) { addReply(c,shared.wrongtypeerr); @@ -5096,14 +5271,17 @@ static void zrangebyscoreCommand(redisClient *c) { zset *zsetobj = o->ptr; zskiplist *zsl = zsetobj->zsl; zskiplistNode *ln; - robj *ele, *lenobj; - unsigned int rangelen = 0; + robj *ele, *lenobj = NULL; + unsigned long rangelen = 0; - /* Get the first node with the score >= min */ + /* Get the first node with the score >= min, or with + * score > min if 'minex' is true. */ ln = zslFirstWithScore(zsl,min); + while (minex && ln && ln->score == min) ln = ln->forward[0]; + if (ln == NULL) { /* No element matching the speciifed interval */ - addReply(c,shared.emptymultibulk); + addReply(c,justcount ? shared.czero : shared.emptymultibulk); return; } @@ -5111,30 +5289,49 @@ static void zrangebyscoreCommand(redisClient *c) { * are in the list, so we push this object that will represent * the multi-bulk length in the output buffer, and will "fix" * it later */ - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); + if (!justcount) { + lenobj = createObject(REDIS_STRING,NULL); + addReply(c,lenobj); + decrRefCount(lenobj); + } - while(ln && ln->score <= max) { + while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) { if (offset) { offset--; ln = ln->forward[0]; continue; } if (limit == 0) break; - ele = ln->obj; - addReplyBulkLen(c,ele); - addReply(c,ele); - addReply(c,shared.crlf); + if (!justcount) { + ele = ln->obj; + addReplyBulkLen(c,ele); + addReply(c,ele); + addReply(c,shared.crlf); + if (withscores) + addReplyDouble(c,ln->score); + } ln = ln->forward[0]; rangelen++; if (limit > 0) limit--; } - lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen); + if (justcount) { + addReplyLong(c,(long)rangelen); + } else { + lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n", + withscores ? (rangelen*2) : rangelen); + } } } } +static void zrangebyscoreCommand(redisClient *c) { + genericZrangebyscoreCommand(c,0); +} + +static void zcountCommand(redisClient *c) { + genericZrangebyscoreCommand(c,1); +} + static void zcardCommand(redisClient *c) { robj *o; zset *zs; @@ -5618,7 +5815,7 @@ static sds genRedisInfoString(void) { uptime/(3600*24), listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), - server.blockedclients, + server.blpop_blocked_clients, zmalloc_used_memory(), hmem, server.dirty, @@ -5656,8 +5853,8 @@ static sds genRedisInfoString(void) { "vm_stats_io_newjobs_len:%lu\r\n" "vm_stats_io_processing_len:%lu\r\n" "vm_stats_io_processed_len:%lu\r\n" - "vm_stats_io_waiting_clients:%lu\r\n" "vm_stats_io_active_threads:%lu\r\n" + "vm_stats_blocked_clients:%lu\r\n" ,(unsigned long long) server.vm_max_memory, (unsigned long long) server.vm_page_size, (unsigned long long) server.vm_pages, @@ -5668,8 +5865,8 @@ static sds genRedisInfoString(void) { (unsigned long) listLength(server.io_newjobs), (unsigned long) listLength(server.io_processing), (unsigned long) listLength(server.io_processed), - (unsigned long) listLength(server.io_clients), - (unsigned long) server.io_active_threads + (unsigned long) server.io_active_threads, + (unsigned long) server.vm_blocked_clients ); unlockThreadedIO(); } @@ -5942,7 +6139,7 @@ static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeou /* Mark the client as a blocked client */ c->flags |= REDIS_BLOCKED; aeDeleteFileEvent(server.el,c->fd,AE_READABLE); - server.blockedclients++; + server.blpop_blocked_clients++; } /* Unblock a client that's waiting in a blocking operation such as BLPOP */ @@ -5968,7 +6165,7 @@ static void unblockClientWaitingData(redisClient *c) { zfree(c->blockingkeys); c->blockingkeys = NULL; c->flags &= (~REDIS_BLOCKED); - server.blockedclients--; + server.blpop_blocked_clients--; /* Ok now we are ready to get read events from socket, note that we * can't trap errors here as it's possible that unblockClientWaitingDatas() is * called from freeClient() itself, and the only thing we can do @@ -6933,9 +7130,9 @@ static int rewriteAppendOnlyFileBackground(void) { close(server.fd); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { - exit(0); + _exit(0); } else { - exit(1); + _exit(1); } } else { /* Parent */ @@ -7061,7 +7258,7 @@ static void vmInit(void) { server.io_newjobs = listCreate(); server.io_processing = listCreate(); server.io_processed = listCreate(); - server.io_clients = listCreate(); + server.io_ready_clients = listCreate(); pthread_mutex_init(&server.io_mutex,NULL); pthread_mutex_init(&server.obj_freelist_mutex,NULL); pthread_mutex_init(&server.io_swapfile_mutex,NULL); @@ -7091,8 +7288,6 @@ static void vmMarkPageUsed(off_t page) { int bit = page&7; redisAssert(vmFreePage(page) == 1); server.vm_bitmap[byte] |= 1< 100000000) { - *((char*)-1) = 'x'; - } + redisLog(REDIS_DEBUG,"Mark FREE pages: %lld pages at %lld\n", + (long long)count, (long long)page); } /* Test if the page is free */ @@ -7174,7 +7368,6 @@ static int vmFindContiguousPages(off_t *first, off_t n) { numfree = 0; } } - redisLog(REDIS_DEBUG, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X'); if (vmFreePage(this)) { /* This is a free page */ numfree++; @@ -7182,6 +7375,7 @@ static int vmFindContiguousPages(off_t *first, off_t n) { if (numfree == n) { *first = this-(n-1); server.vm_next_page = this+1; + redisLog(REDIS_DEBUG, "FOUND CONTIGUOUS PAGES: %lld pages at %lld\n", (long long) n, (long long) *first); return REDIS_OK; } } else { @@ -7254,14 +7448,14 @@ static robj *vmReadObjectFromSwap(off_t page, int type) { if (server.vm_enabled) pthread_mutex_lock(&server.io_swapfile_mutex); if (fseeko(server.vm_fp,page*server.vm_page_size,SEEK_SET) == -1) { redisLog(REDIS_WARNING, - "Unrecoverable VM problem in vmLoadObject(): can't seek: %s", + "Unrecoverable VM problem in vmReadObjectFromSwap(): can't seek: %s", strerror(errno)); - exit(1); + _exit(1); } o = rdbLoadObject(type,server.vm_fp); if (o == NULL) { - redisLog(REDIS_WARNING, "Unrecoverable VM problem in vmLoadObject(): can't load object from swap file: %s", strerror(errno)); - exit(1); + redisLog(REDIS_WARNING, "Unrecoverable VM problem in vmReadObjectFromSwap(): can't load object from swap file: %s", strerror(errno)); + _exit(1); } if (server.vm_enabled) pthread_mutex_unlock(&server.io_swapfile_mutex); return o; @@ -7275,7 +7469,7 @@ static robj *vmReadObjectFromSwap(off_t page, int type) { static robj *vmGenericLoadObject(robj *key, int preview) { robj *val; - redisAssert(key->storage == REDIS_VM_SWAPPED); + redisAssert(key->storage == REDIS_VM_SWAPPED || key->storage == REDIS_VM_LOADING); val = vmReadObjectFromSwap(key->vm.page,key->vtype); if (!preview) { key->storage = REDIS_VM_MEMORY; @@ -7424,10 +7618,7 @@ static int vmSwapOneObject(int usethreads) { } } } - if (best == NULL) { - redisLog(REDIS_DEBUG,"No swappable key found!"); - return REDIS_ERR; - } + if (best == NULL) return REDIS_ERR; key = dictGetEntryKey(best); val = dictGetEntryVal(best); @@ -7485,8 +7676,9 @@ static int deleteIfSwapped(redisDb *db, robj *key) { /* =================== Virtual Memory - Threaded I/O ======================= */ static void freeIOJob(iojob *j) { - if (j->type == REDIS_IOJOB_PREPARE_SWAP || - j->type == REDIS_IOJOB_DO_SWAP) + if ((j->type == REDIS_IOJOB_PREPARE_SWAP || + j->type == REDIS_IOJOB_DO_SWAP || + j->type == REDIS_IOJOB_LOAD) && j->val != NULL) decrRefCount(j->val); decrRefCount(j->key); zfree(j); @@ -7537,6 +7729,8 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, assert(de != NULL); key = dictGetEntryKey(de); if (j->type == REDIS_IOJOB_LOAD) { + redisDb *db; + /* Key loaded, bring it at home */ key->storage = REDIS_VM_MEMORY; key->vm.atime = server.unixtime; @@ -7545,7 +7739,12 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, (unsigned char*) key->ptr); server.vm_stats_swapped_objects--; server.vm_stats_swapins++; + dictGetEntryVal(de) = j->val; + incrRefCount(j->val); + db = j->db; freeIOJob(j); + /* Handle clients waiting for this key to be loaded. */ + handleClientsBlockedOnSwappedKey(db,key); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { /* Now we know the amount of pages required to swap this object. * Let's find some space for it, and queue this task again @@ -7671,23 +7870,25 @@ again: listDelNode(lists[i],ln); break; case 1: /* io_processing */ - /* Oh Shi- the thread is messing with the Job, and - * probably with the object if this is a - * PREPARE_SWAP or DO_SWAP job. Better to wait for the - * job to move into the next queue... */ - if (job->type != REDIS_IOJOB_LOAD) { - /* Yes, we try again and again until the job - * is completed. */ - unlockThreadedIO(); - /* But let's wait some time for the I/O thread - * to finish with this job. After all this condition - * should be very rare. */ - usleep(1); - goto again; - } else { - job->canceled = 1; - break; - } + /* Oh Shi- the thread is messing with the Job: + * + * Probably it's accessing the object if this is a + * PREPARE_SWAP or DO_SWAP job. + * If it's a LOAD job it may be reading from disk and + * if we don't wait for the job to terminate before to + * cancel it, maybe in a few microseconds data can be + * corrupted in this pages. So the short story is: + * + * Better to wait for the job to move into the + * next queue (processed)... */ + + /* We try again and again until the job is completed. */ + unlockThreadedIO(); + /* But let's wait some time for the I/O thread + * to finish with this job. After all this condition + * should be very rare. */ + usleep(1); + goto again; case 2: /* io_processed */ /* The job was already processed, that's easy... * just mark it as canceled so that we'll ignore it @@ -7740,6 +7941,7 @@ static void *IOThreadEntryPoint(void *arg) { /* Process the Job */ if (j->type == REDIS_IOJOB_LOAD) { + j->val = vmReadObjectFromSwap(j->page,j->key->vtype); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { FILE *fp = fopen("/dev/null","w+"); j->pages = rdbSavedObjectPages(j->val,fp); @@ -7765,8 +7967,15 @@ static void *IOThreadEntryPoint(void *arg) { static void spawnIOThread(void) { pthread_t thread; + sigset_t mask, omask; + sigemptyset(&mask); + sigaddset(&mask,SIGCHLD); + sigaddset(&mask,SIGHUP); + sigaddset(&mask,SIGPIPE); + pthread_sigmask(SIG_SETMASK, &mask, &omask); pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL); + pthread_sigmask(SIG_SETMASK, &omask, NULL); server.io_active_threads++; } @@ -7800,12 +8009,13 @@ static void waitEmptyIOJobsQueue(void) { } static void vmReopenSwapFile(void) { - fclose(server.vm_fp); + /* Note: we don't close the old one as we are in the child process + * and don't want to mess at all with the original file object. */ server.vm_fp = fopen(server.vm_swap_file,"r+b"); if (server.vm_fp == NULL) { redisLog(REDIS_WARNING,"Can't re-open the VM swap file: %s. Exiting.", server.vm_swap_file); - exit(1); + _exit(1); } server.vm_fd = fileno(server.vm_fp); } @@ -7843,16 +8053,158 @@ static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) { /* ============ Virtual Memory - Blocking clients on missing keys =========== */ +/* This function makes the clinet 'c' waiting for the key 'key' to be loaded. + * If there is not already a job loading the key, it is craeted. + * The key is added to the io_keys list in the client structure, and also + * in the hash table mapping swapped keys to waiting clients, that is, + * server.io_waited_keys. */ +static int waitForSwappedKey(redisClient *c, robj *key) { + struct dictEntry *de; + robj *o; + list *l; + + /* If the key does not exist or is already in RAM we don't need to + * block the client at all. */ + de = dictFind(c->db->dict,key); + if (de == NULL) return 0; + o = dictGetEntryKey(de); + if (o->storage == REDIS_VM_MEMORY) { + return 0; + } else if (o->storage == REDIS_VM_SWAPPING) { + /* We were swapping the key, undo it! */ + vmCancelThreadedIOJob(o); + return 0; + } + + /* OK: the key is either swapped, or being loaded just now. */ + + /* Add the key to the list of keys this client is waiting for. + * This maps clients to keys they are waiting for. */ + listAddNodeTail(c->io_keys,key); + incrRefCount(key); + + /* Add the client to the swapped keys => clients waiting map. */ + de = dictFind(c->db->io_keys,key); + if (de == NULL) { + int retval; + + /* For every key we take a list of clients blocked for it */ + l = listCreate(); + retval = dictAdd(c->db->io_keys,key,l); + incrRefCount(key); + assert(retval == DICT_OK); + } else { + l = dictGetEntryVal(de); + } + listAddNodeTail(l,c); + + /* Are we already loading the key from disk? If not create a job */ + if (o->storage == REDIS_VM_SWAPPED) { + iojob *j; + + o->storage = REDIS_VM_LOADING; + j = zmalloc(sizeof(*j)); + j->type = REDIS_IOJOB_LOAD; + j->db = c->db; + j->key = dupStringObject(key); + j->key->vtype = o->vtype; + j->page = o->vm.page; + j->val = NULL; + j->canceled = 0; + j->thread = (pthread_t) -1; + lockThreadedIO(); + queueIOJob(j); + unlockThreadedIO(); + } + return 1; +} + /* Is this client attempting to run a command against swapped keys? - * If so, block it ASAP, load the keys in background, then resume it.4 + * If so, block it ASAP, load the keys in background, then resume it. + * + * The important idea about this function is that it can fail! If keys will + * still be swapped when the client is resumed, this key lookups will + * just block loading keys from disk. In practical terms this should only + * happen with SORT BY command or if there is a bug in this function. * - * The improtat thing about this function is that it can fail! If keys will - * still be swapped when the client is resumed, a few of key lookups will - * just block loading keys from disk. */ -#if 0 -static void blockClientOnSwappedKeys(redisClient *c) { + * Return 1 if the client is marked as blocked, 0 if the client can + * continue as the keys it is going to access appear to be in memory. */ +static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c) { + int j, last; + + if (cmd->vm_firstkey == 0) return 0; + last = cmd->vm_lastkey; + if (last < 0) last = c->argc+last; + for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) + waitForSwappedKey(c,c->argv[j]); + /* If the client was blocked for at least one key, mark it as blocked. */ + if (listLength(c->io_keys)) { + c->flags |= REDIS_IO_WAIT; + aeDeleteFileEvent(server.el,c->fd,AE_READABLE); + server.vm_blocked_clients++; + return 1; + } else { + return 0; + } +} + +/* Remove the 'key' from the list of blocked keys for a given client. + * + * The function returns 1 when there are no longer blocking keys after + * the current one was removed (and the client can be unblocked). */ +static int dontWaitForSwappedKey(redisClient *c, robj *key) { + list *l; + listNode *ln; + listIter li; + struct dictEntry *de; + + /* Remove the key from the list of keys this client is waiting for. */ + listRewind(c->io_keys,&li); + while ((ln = listNext(&li)) != NULL) { + if (compareStringObjects(ln->value,key) == 0) { + listDelNode(c->io_keys,ln); + break; + } + } + assert(ln != NULL); + + /* Remove the client form the key => waiting clients map. */ + de = dictFind(c->db->io_keys,key); + assert(de != NULL); + l = dictGetEntryVal(de); + ln = listSearchKey(l,c); + assert(ln != NULL); + listDelNode(l,ln); + if (listLength(l) == 0) + dictDelete(c->db->io_keys,key); + + return listLength(c->io_keys) == 0; +} + +static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) { + struct dictEntry *de; + list *l; + listNode *ln; + int len; + + de = dictFind(db->io_keys,key); + if (!de) return; + + l = dictGetEntryVal(de); + len = listLength(l); + /* Note: we can't use something like while(listLength(l)) as the list + * can be freed by the calling function when we remove the last element. */ + while (len--) { + ln = listFirst(l); + redisClient *c = ln->value; + + if (dontWaitForSwappedKey(c,key)) { + /* Put the client in the list of clients ready to go as we + * loaded all the keys about it. */ + listAddNodeTail(server.io_ready_clients,c); + } + } } -#endif /* ================================= Debugging ============================== */ @@ -7889,8 +8241,8 @@ static void debugCommand(redisClient *c) { } key = dictGetEntryKey(de); val = dictGetEntryVal(de); - if (server.vm_enabled && (key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING)) { + if (!server.vm_enabled || (key->storage == REDIS_VM_MEMORY || + key->storage == REDIS_VM_SWAPPING)) { addReplySds(c,sdscatprintf(sdsempty(), "+Key at:%p refcount:%d, value at:%p refcount:%d " "encoding:%d serializedlength:%lld\r\n", @@ -7996,6 +8348,8 @@ static void daemonize(void) { } int main(int argc, char **argv) { + time_t start; + initServerConfig(); if (argc == 2) { resetServerSaveParams(); @@ -8012,14 +8366,16 @@ int main(int argc, char **argv) { #ifdef __linux__ linuxOvercommitMemoryWarning(); #endif + start = time(NULL); if (server.appendonly) { if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK) - redisLog(REDIS_NOTICE,"DB loaded from append only file"); + redisLog(REDIS_NOTICE,"DB loaded from append only file: %ld seconds",time(NULL)-start); } else { if (rdbLoad(server.dbfilename) == REDIS_OK) - redisLog(REDIS_NOTICE,"DB loaded from disk"); + redisLog(REDIS_NOTICE,"DB loaded from disk: %ld seconds",time(NULL)-start); } redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); + aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); aeDeleteEventLoop(server.el); return 0; @@ -8090,7 +8446,7 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) { } } /* free(messages); Don't call free() with possibly corrupted memory. */ - exit(0); + _exit(0); } static void setupSigSegvAction(void) {