X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/30dd89b6b72643beafa098344752812029ce7bd9..6205b46387ef47e0fcb9bd6c9f371c9eb5cdd7e5:/redis.c diff --git a/redis.c b/redis.c index b95eb455..92ae07b7 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "1.3.12" +#define REDIS_VERSION "2.1.1" #include "fmacros.h" #include "config.h" @@ -190,6 +190,7 @@ static char* strencoding[] = { #define REDIS_MULTI 8 /* This client is in a MULTI context */ #define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */ #define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ +#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */ /* Slave replication state - slave side */ #define REDIS_REPL_NONE 0 /* No active replication */ @@ -285,8 +286,9 @@ typedef struct redisObject { typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ - dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */ + dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ dict *io_keys; /* Keys with clients waiting for VM I/O */ + dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@ -324,13 +326,14 @@ typedef struct redisClient { long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - robj **blockingkeys; /* The key we are waiting to terminate a blocking + robj **blocking_keys; /* The key we are waiting to terminate a blocking * operation such as BLPOP. Otherwise NULL. */ - int blockingkeysnum; /* Number of blocking keys */ + int blocking_keys_num; /* Number of blocking keys */ time_t blockingto; /* Blocking operation timeout. If UNIX current time * is >= blockingto then the operation timed out. */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ + list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ } redisClient; @@ -366,6 +369,7 @@ struct redisServer { int daemonize; int appendonly; int appendfsync; + int shutdown_asap; time_t lastfsync; int appendfd; int appendseldb; @@ -631,6 +635,10 @@ static int equalStringObjects(robj *a, robj *b); static void usage(); static int rewriteAppendOnlyFileBackground(void); static int vmSwapObjectBlocking(robj *key, robj *val); +static int prepareForShutdown(); +static void touchWatchedKey(redisDb *db, robj *key); +static void touchWatchedKeysOnFlush(int dbid); +static void unwatchAllKeys(redisClient *c); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); @@ -737,12 +745,15 @@ static void unsubscribeCommand(redisClient *c); static void psubscribeCommand(redisClient *c); static void punsubscribeCommand(redisClient *c); static void publishCommand(redisClient *c); +static void watchCommand(redisClient *c); +static void unwatchCommand(redisClient *c); /*================================= Globals ================================= */ /* Global vars */ static struct redisServer server; /* server global state */ -static struct redisCommand cmdTable[] = { +static struct redisCommand *commandTable; +static struct redisCommand readonlyCommandTable[] = { {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, @@ -849,7 +860,8 @@ static struct redisCommand cmdTable[] = { {"psubscribe",psubscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, {"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0}, {"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0}, - {NULL,NULL,0,0,NULL,0,0,0} + {"watch",watchCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, + {"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0} }; /*============================ Utility functions ============================ */ @@ -1420,6 +1432,13 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD * To access a global var is faster than calling time(NULL) */ server.unixtime = time(NULL); + /* We received a SIGTERM, shutting down here in a safe way, as it is + * not ok doing so inside the signal handler. */ + if (server.shutdown_asap) { + if (prepareForShutdown() == REDIS_OK) exit(0); + redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); + } + /* Show some info about non-empty databases */ for (j = 0; j < server.dbnum; j++) { long long size, used, vkeys; @@ -1687,6 +1706,7 @@ static void initServerConfig() { server.vm_blocked_clients = 0; server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES; server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE; + server.shutdown_asap = 0; resetServerSaveParams(); @@ -1735,7 +1755,8 @@ static void initServer() { for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); - server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL); + server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); + server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); if (server.vm_enabled) server.db[j].io_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; @@ -2001,6 +2022,9 @@ static void freeClient(redisClient *c) { if (c->flags & REDIS_BLOCKED) unblockClientWaitingData(c); + /* UNWATCH all the keys */ + unwatchAllKeys(c); + listRelease(c->watched_keys); /* Unsubscribe from all the pubsub channels */ pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeAllPatterns(c,0); @@ -2016,7 +2040,8 @@ static void freeClient(redisClient *c) { ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); - /* Remove from the list of clients waiting for swapped keys */ + /* Remove from the list of clients that are now ready to be restarted + * after waiting for swapped keys */ if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) { ln = listSearchKey(server.io_ready_clients,c); if (ln) { @@ -2024,6 +2049,7 @@ static void freeClient(redisClient *c) { server.vm_blocked_clients--; } } + /* Remove from the list of clients waiting for swapped keys */ while (server.vm_enabled && listLength(c->io_keys)) { ln = listFirst(c->io_keys); dontWaitForSwappedKey(c,ln->value); @@ -2221,13 +2247,29 @@ static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int } } +static int qsortRedisCommands(const void *r1, const void *r2) { + return strcasecmp( + ((struct redisCommand*)r1)->name, + ((struct redisCommand*)r2)->name); +} + +static void sortCommandTable() { + /* Copy and sort the read-only version of the command table */ + commandTable = (struct redisCommand*)malloc(sizeof(readonlyCommandTable)); + memcpy(commandTable,readonlyCommandTable,sizeof(readonlyCommandTable)); + qsort(commandTable, + sizeof(readonlyCommandTable)/sizeof(struct redisCommand), + sizeof(struct redisCommand),qsortRedisCommands); +} + static struct redisCommand *lookupCommand(char *name) { - int j = 0; - while(cmdTable[j].name != NULL) { - if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j]; - j++; - } - return NULL; + struct redisCommand tmp = {name,NULL,0,0,NULL,0,0,0}; + return bsearch( + &tmp, + commandTable, + sizeof(readonlyCommandTable)/sizeof(struct redisCommand), + sizeof(struct redisCommand), + qsortRedisCommands); } /* resetClient prepare the client to process the next command */ @@ -2420,7 +2462,10 @@ static int processCommand(redisClient *c) { } /* Exec the command */ - if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) { + if (c->flags & REDIS_MULTI && + cmd->proc != execCommand && cmd->proc != discardCommand && + cmd->proc != multiCommand && cmd->proc != watchCommand) + { queueMultiCommand(c,cmd); addReply(c,shared.queued); } else { @@ -2715,9 +2760,10 @@ static redisClient *createClient(int fd) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->blockingkeys = NULL; - c->blockingkeysnum = 0; + c->blocking_keys = NULL; + c->blocking_keys_num = 0; c->io_keys = listCreate(); + c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); @@ -2917,8 +2963,8 @@ static robj *createStringObjectFromLongLong(long long value) { incrRefCount(shared.integers[value]); o = shared.integers[value]; } else { - o = createObject(REDIS_STRING, NULL); if (value >= LONG_MIN && value <= LONG_MAX) { + o = createObject(REDIS_STRING, NULL); o->encoding = REDIS_ENCODING_INT; o->ptr = (void*)((long)value); } else { @@ -3086,6 +3132,7 @@ static robj *lookupKeyRead(redisDb *db, robj *key) { static robj *lookupKeyWrite(redisDb *db, robj *key) { deleteIfVolatile(db,key); + touchWatchedKey(db,key); return lookupKey(db,key); } @@ -4149,6 +4196,41 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ return REDIS_ERR; /* Just to avoid warning */ } +/*================================== Shutdown =============================== */ +static int prepareForShutdown() { + redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); + /* Kill the saving child if there is a background saving in progress. + We want to avoid race conditions, for instance our saving child may + overwrite the synchronous saving did by SHUTDOWN. */ + if (server.bgsavechildpid != -1) { + redisLog(REDIS_WARNING,"There is a live saving child. Killing it!"); + kill(server.bgsavechildpid,SIGKILL); + rdbRemoveTempFile(server.bgsavechildpid); + } + if (server.appendonly) { + /* Append only file: fsync() the AOF and exit */ + fsync(server.appendfd); + if (server.vm_enabled) unlink(server.vm_swap_file); + } else { + /* Snapshotting. Perform a SYNC SAVE and exit */ + if (rdbSave(server.dbfilename) == REDIS_OK) { + if (server.daemonize) + unlink(server.pidfile); + redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); + } else { + /* Ooops.. error saving! The best we can do is to continue + * operating. Note that if there was a background saving process, + * in the next cron() Redis will be notified that the background + * saving aborted, handling special stuff like slaves pending for + * synchronization... */ + redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit"); + return REDIS_ERR; + } + } + redisLog(REDIS_WARNING,"Server exit now, bye bye..."); + return REDIS_OK; +} + /*================================== Commands =============================== */ static void authCommand(redisClient *c) { @@ -4184,6 +4266,7 @@ static void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj } } + touchWatchedKey(c->db,key); if (nx) deleteIfVolatile(c->db,key); retval = dictAdd(c->db->dict,key,val); if (retval == DICT_ERR) { @@ -4454,6 +4537,7 @@ static void delCommand(redisClient *c) { for (j = 1; j < c->argc; j++) { if (deleteKey(c->db,c->argv[j])) { + touchWatchedKey(c->db,c->argv[j]); server.dirty++; deleted++; } @@ -4588,39 +4672,9 @@ static void bgsaveCommand(redisClient *c) { } static void shutdownCommand(redisClient *c) { - redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); - /* Kill the saving child if there is a background saving in progress. - We want to avoid race conditions, for instance our saving child may - overwrite the synchronous saving did by SHUTDOWN. */ - if (server.bgsavechildpid != -1) { - redisLog(REDIS_WARNING,"There is a live saving child. Killing it!"); - kill(server.bgsavechildpid,SIGKILL); - rdbRemoveTempFile(server.bgsavechildpid); - } - if (server.appendonly) { - /* Append only file: fsync() the AOF and exit */ - fsync(server.appendfd); - if (server.vm_enabled) unlink(server.vm_swap_file); + if (prepareForShutdown() == REDIS_OK) exit(0); - } else { - /* Snapshotting. Perform a SYNC SAVE and exit */ - if (rdbSave(server.dbfilename) == REDIS_OK) { - if (server.daemonize) - unlink(server.pidfile); - redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); - redisLog(REDIS_WARNING,"Server exit now, bye bye..."); - exit(0); - } else { - /* Ooops.. error saving! The best we can do is to continue - * operating. Note that if there was a background saving process, - * in the next cron() Redis will be notified that the background - * saving aborted, handling special stuff like slaves pending for - * synchronization... */ - redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit"); - addReplySds(c, - sdsnew("-ERR can't quit, problems saving the DB\r\n")); - } - } + addReplySds(c, sdsnew("-ERR Errors trying to SHUTDOWN. Check logs.\r\n")); } static void renameGenericCommand(redisClient *c, int nx) { @@ -4648,6 +4702,7 @@ static void renameGenericCommand(redisClient *c, int nx) { incrRefCount(c->argv[2]); } deleteKey(c->db,c->argv[1]); + touchWatchedKey(c->db,c->argv[2]); server.dirty++; addReply(c,nx ? shared.cone : shared.ok); } @@ -5397,8 +5452,10 @@ static zskiplistNode *zslCreateNode(int level, double score, robj *obj) { zskiplistNode *zn = zmalloc(sizeof(*zn)); zn->forward = zmalloc(sizeof(zskiplistNode*) * level); - if (level > 0) + if (level > 1) zn->span = zmalloc(sizeof(unsigned int) * (level - 1)); + else + zn->span = NULL; zn->score = score; zn->obj = obj; return zn; @@ -5693,6 +5750,11 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor zset *zs; double *score; + if (isnan(scoreval)) { + addReplySds(c,sdsnew("-ERR provide score is Not A Number (nan)\r\n")); + return; + } + zsetobj = lookupKeyWrite(c->db,key); if (zsetobj == NULL) { zsetobj = createZsetObject(); @@ -5721,6 +5783,15 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor } else { *score = scoreval; } + if (isnan(*score)) { + addReplySds(c, + sdsnew("-ERR resulting score is Not A Number (nan)\r\n")); + zfree(score); + /* Note that we don't need to check if the zset may be empty and + * should be removed here, as we can only obtain Nan as score if + * there was already an element in the sorted set. */ + return; + } } else { *score = scoreval; } @@ -5884,6 +5955,7 @@ static int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) { #define REDIS_AGGR_SUM 1 #define REDIS_AGGR_MIN 2 #define REDIS_AGGR_MAX 3 +#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e)) inline static void zunionInterAggregate(double *target, double val, int aggregate) { if (aggregate == REDIS_AGGR_SUM) { @@ -5899,7 +5971,7 @@ inline static void zunionInterAggregate(double *target, double val, int aggregat } static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { - int i, j, zsetnum; + int i, j, setnum; int aggregate = REDIS_AGGR_SUM; zsetopsrc *src; robj *dstobj; @@ -5907,32 +5979,35 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dictIterator *di; dictEntry *de; - /* expect zsetnum input keys to be given */ - zsetnum = atoi(c->argv[2]->ptr); - if (zsetnum < 1) { + /* expect setnum input keys to be given */ + setnum = atoi(c->argv[2]->ptr); + if (setnum < 1) { addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n")); return; } /* test if the expected number of keys would overflow */ - if (3+zsetnum > c->argc) { + if (3+setnum > c->argc) { addReply(c,shared.syntaxerr); return; } /* read keys to be used for input */ - src = zmalloc(sizeof(zsetopsrc) * zsetnum); - for (i = 0, j = 3; i < zsetnum; i++, j++) { - robj *zsetobj = lookupKeyWrite(c->db,c->argv[j]); - if (!zsetobj) { + src = zmalloc(sizeof(zsetopsrc) * setnum); + for (i = 0, j = 3; i < setnum; i++, j++) { + robj *obj = lookupKeyWrite(c->db,c->argv[j]); + if (!obj) { src[i].dict = NULL; } else { - if (zsetobj->type != REDIS_ZSET) { + if (obj->type == REDIS_ZSET) { + src[i].dict = ((zset*)obj->ptr)->dict; + } else if (obj->type == REDIS_SET) { + src[i].dict = (obj->ptr); + } else { zfree(src); addReply(c,shared.wrongtypeerr); return; } - src[i].dict = ((zset*)zsetobj->ptr)->dict; } /* default all weights to 1 */ @@ -5944,9 +6019,9 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { int remaining = c->argc - j; while (remaining) { - if (remaining >= (zsetnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { + if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { j++; remaining--; - for (i = 0; i < zsetnum; i++, j++, remaining--) { + for (i = 0; i < setnum; i++, j++, remaining--) { if (getDoubleFromObjectOrReply(c, c->argv[j], &src[i].weight, NULL) != REDIS_OK) return; } @@ -5974,7 +6049,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { /* sort sets from the smallest to largest, this will improve our * algorithm's performance */ - qsort(src,zsetnum,sizeof(zsetopsrc), qsortCompareZsetopsrcByCardinality); + qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality); dstobj = createZsetObject(); dstzset = dstobj->ptr; @@ -5987,12 +6062,12 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { di = dictGetIterator(src[0].dict); while((de = dictNext(di)) != NULL) { double *score = zmalloc(sizeof(double)), value; - *score = src[0].weight * (*(double*)dictGetEntryVal(de)); + *score = src[0].weight * zunionInterDictValue(de); - for (j = 1; j < zsetnum; j++) { + for (j = 1; j < setnum; j++) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { - value = src[j].weight * (*(double*)dictGetEntryVal(other)); + value = src[j].weight * zunionInterDictValue(other); zunionInterAggregate(score, value, aggregate); } else { break; @@ -6000,7 +6075,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { } /* skip entry when not present in every source dict */ - if (j != zsetnum) { + if (j != setnum) { zfree(score); } else { robj *o = dictGetEntryKey(de); @@ -6013,7 +6088,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dictReleaseIterator(di); } } else if (op == REDIS_OP_UNION) { - for (i = 0; i < zsetnum; i++) { + for (i = 0; i < setnum; i++) { if (!src[i].dict) continue; di = dictGetIterator(src[i].dict); @@ -6022,14 +6097,14 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue; double *score = zmalloc(sizeof(double)), value; - *score = src[i].weight * (*(double*)dictGetEntryVal(de)); + *score = src[i].weight * zunionInterDictValue(de); /* because the zsets are sorted by size, its only possible * for sets at larger indices to hold this entry */ - for (j = (i+1); j < zsetnum; j++) { + for (j = (i+1); j < setnum; j++) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { - value = src[j].weight * (*(double*)dictGetEntryVal(other)); + value = src[j].weight * zunionInterDictValue(other); zunionInterAggregate(score, value, aggregate); } } @@ -6744,12 +6819,14 @@ static void convertToRealHash(robj *o) { static void flushdbCommand(redisClient *c) { server.dirty += dictSize(c->db->dict); + touchWatchedKeysOnFlush(c->db->id); dictEmpty(c->db->dict); dictEmpty(c->db->expires); addReply(c,shared.ok); } static void flushallCommand(redisClient *c) { + touchWatchedKeysOnFlush(-1); server.dirty += emptyDb(); addReply(c,shared.ok); if (server.bgsavechildpid != -1) { @@ -7454,6 +7531,10 @@ static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) { } static void multiCommand(redisClient *c) { + if (c->flags & REDIS_MULTI) { + addReplySds(c,sdsnew("-ERR MULTI calls can not be nested\r\n")); + return; + } c->flags |= REDIS_MULTI; addReply(c,shared.ok); } @@ -7494,6 +7575,17 @@ static void execCommand(redisClient *c) { return; } + /* Check if we need to abort the EXEC if some WATCHed key was touched. + * A failed EXEC will return a multi bulk nil object. */ + if (c->flags & REDIS_DIRTY_CAS) { + freeClientMultiState(c); + initClientMultiState(c); + c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); + unwatchAllKeys(c); + addReply(c,shared.nullmultibulk); + return; + } + /* Replicate a MULTI request now that we are sure the block is executed. * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency @@ -7501,6 +7593,7 @@ static void execCommand(redisClient *c) { execCommandReplicateMulti(c); /* Exec all the queued commands */ + unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; orig_argc = c->argc; addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count)); @@ -7513,7 +7606,7 @@ static void execCommand(redisClient *c) { c->argc = orig_argc; freeClientMultiState(c); initClientMultiState(c); - c->flags &= (~REDIS_MULTI); + c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); /* Make sure the EXEC command is always replicated / AOF, since we * always send the MULTI command (we can't know beforehand if the * next operations will contain at least a modification to the DB). */ @@ -7540,7 +7633,7 @@ static void execCommand(redisClient *c) { * empty we need to block. In order to do so we remove the notification for * new data to read in the client socket (so that we'll not serve new * requests if the blocking request is not served). Also we put the client - * in a dictionary (db->blockingkeys) mapping keys to a list of clients + * in a dictionary (db->blocking_keys) mapping keys to a list of clients * blocking for this keys. * - If a PUSH operation against a key with blocked clients waiting is * performed, we serve the first in the list: basically instead to push @@ -7558,22 +7651,22 @@ static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeou list *l; int j; - c->blockingkeys = zmalloc(sizeof(robj*)*numkeys); - c->blockingkeysnum = numkeys; + c->blocking_keys = zmalloc(sizeof(robj*)*numkeys); + c->blocking_keys_num = numkeys; c->blockingto = timeout; for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->blockingkeys[j] = keys[j]; + c->blocking_keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ - de = dictFind(c->db->blockingkeys,keys[j]); + de = dictFind(c->db->blocking_keys,keys[j]); if (de == NULL) { int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); - retval = dictAdd(c->db->blockingkeys,keys[j],l); + retval = dictAdd(c->db->blocking_keys,keys[j],l); incrRefCount(keys[j]); assert(retval == DICT_OK); } else { @@ -7592,22 +7685,22 @@ static void unblockClientWaitingData(redisClient *c) { list *l; int j; - assert(c->blockingkeys != NULL); + assert(c->blocking_keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->blockingkeysnum; j++) { + for (j = 0; j < c->blocking_keys_num; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blockingkeys,c->blockingkeys[j]); + de = dictFind(c->db->blocking_keys,c->blocking_keys[j]); assert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blockingkeys,c->blockingkeys[j]); - decrRefCount(c->blockingkeys[j]); + dictDelete(c->db->blocking_keys,c->blocking_keys[j]); + decrRefCount(c->blocking_keys[j]); } /* Cleanup the client structure */ - zfree(c->blockingkeys); - c->blockingkeys = NULL; + zfree(c->blocking_keys); + c->blocking_keys = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -7634,7 +7727,7 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { list *l; listNode *ln; - de = dictFind(c->db->blockingkeys,key); + de = dictFind(c->db->blocking_keys,key); if (de == NULL) return 0; l = dictGetEntryVal(de); ln = listFirst(l); @@ -10331,6 +10424,147 @@ static void publishCommand(redisClient *c) { addReplyLongLong(c,receivers); } +/* ===================== WATCH (CAS alike for MULTI/EXEC) =================== + * + * The implementation uses a per-DB hash table mapping keys to list of clients + * WATCHing those keys, so that given a key that is going to be modified + * we can mark all the associated clients as dirty. + * + * Also every client contains a list of WATCHed keys so that's possible to + * un-watch such keys when the client is freed or when UNWATCH is called. */ + +/* In the client->watched_keys list we need to use watchedKey structures + * as in order to identify a key in Redis we need both the key name and the + * DB */ +typedef struct watchedKey { + robj *key; + redisDb *db; +} watchedKey; + +/* Watch for the specified key */ +static void watchForKey(redisClient *c, robj *key) { + list *clients = NULL; + listIter li; + listNode *ln; + watchedKey *wk; + + /* Check if we are already watching for this key */ + listRewind(c->watched_keys,&li); + while((ln = listNext(&li))) { + wk = listNodeValue(ln); + if (wk->db == c->db && equalStringObjects(key,wk->key)) + return; /* Key already watched */ + } + /* This key is not already watched in this DB. Let's add it */ + clients = dictFetchValue(c->db->watched_keys,key); + if (!clients) { + clients = listCreate(); + dictAdd(c->db->watched_keys,key,clients); + incrRefCount(key); + } + listAddNodeTail(clients,c); + /* Add the new key to the lits of keys watched by this client */ + wk = zmalloc(sizeof(*wk)); + wk->key = key; + wk->db = c->db; + incrRefCount(key); + listAddNodeTail(c->watched_keys,wk); +} + +/* Unwatch all the keys watched by this client. To clean the EXEC dirty + * flag is up to the caller. */ +static void unwatchAllKeys(redisClient *c) { + listIter li; + listNode *ln; + + if (listLength(c->watched_keys) == 0) return; + listRewind(c->watched_keys,&li); + while((ln = listNext(&li))) { + list *clients; + watchedKey *wk; + + /* Lookup the watched key -> clients list and remove the client + * from the list */ + wk = listNodeValue(ln); + clients = dictFetchValue(wk->db->watched_keys, wk->key); + assert(clients != NULL); + listDelNode(clients,listSearchKey(clients,c)); + /* Kill the entry at all if this was the only client */ + if (listLength(clients) == 0) + dictDelete(wk->db->watched_keys, wk->key); + /* Remove this watched key from the client->watched list */ + listDelNode(c->watched_keys,ln); + decrRefCount(wk->key); + zfree(wk); + } +} + +/* "Touch" a key, so that if this key is being WATCHed by some client the + * next EXEC will fail. */ +static void touchWatchedKey(redisDb *db, robj *key) { + list *clients; + listIter li; + listNode *ln; + + if (dictSize(db->watched_keys) == 0) return; + clients = dictFetchValue(db->watched_keys, key); + if (!clients) return; + + /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ + /* Check if we are already watching for this key */ + listRewind(clients,&li); + while((ln = listNext(&li))) { + redisClient *c = listNodeValue(ln); + + c->flags |= REDIS_DIRTY_CAS; + } +} + +/* On FLUSHDB or FLUSHALL all the watched keys that are present before the + * flush but will be deleted as effect of the flushing operation should + * be touched. "dbid" is the DB that's getting the flush. -1 if it is + * a FLUSHALL operation (all the DBs flushed). */ +static void touchWatchedKeysOnFlush(int dbid) { + listIter li1, li2; + listNode *ln; + + /* For every client, check all the waited keys */ + listRewind(server.clients,&li1); + while((ln = listNext(&li1))) { + redisClient *c = listNodeValue(ln); + listRewind(c->watched_keys,&li2); + while((ln = listNext(&li2))) { + watchedKey *wk = listNodeValue(ln); + + /* For every watched key matching the specified DB, if the + * key exists, mark the client as dirty, as the key will be + * removed. */ + if (dbid == -1 || wk->db->id == dbid) { + if (dictFind(wk->db->dict, wk->key) != NULL) + c->flags |= REDIS_DIRTY_CAS; + } + } + } +} + +static void watchCommand(redisClient *c) { + int j; + + if (c->flags & REDIS_MULTI) { + addReplySds(c,sdsnew("-ERR WATCH inside MULTI is not allowed\r\n")); + return; + } + for (j = 1; j < c->argc; j++) + watchForKey(c,c->argv[j]); + addReply(c,shared.ok); +} + +static void unwatchCommand(redisClient *c) { + unwatchAllKeys(c); + c->flags &= (~REDIS_DIRTY_CAS); + addReply(c,shared.ok); +} + /* ================================= Debugging ============================== */ /* Compute the sha1 of string at 's' with 'len' bytes long. @@ -10417,18 +10651,23 @@ static void computeDatasetDigest(unsigned char *final) { /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - robj *key, *o; + robj *key, *o, *kcopy; time_t expiretime; memset(digest,0,20); /* This key-val digest */ key = dictGetEntryKey(de); - mixObjectDigest(digest,key); - if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) { + + if (!server.vm_enabled) { + mixObjectDigest(digest,key); o = dictGetEntryVal(de); - incrRefCount(o); } else { - o = vmPreviewObject(key); + /* Don't work with the key directly as when VM is active + * this is unsafe: TODO: fix decrRefCount to check if the + * count really reached 0 to avoid this mess */ + kcopy = dupStringObject(key); + mixObjectDigest(digest,kcopy); + o = lookupKeyRead(db,kcopy); + decrRefCount(kcopy); } aux = htonl(o->type); mixDigest(digest,&aux,sizeof(aux)); @@ -10497,7 +10736,6 @@ static void computeDatasetDigest(unsigned char *final) { } else { redisPanic("Unknown object type"); } - decrRefCount(o); /* If the key has an expire, add it to the mix */ if (expiretime != -1) xorDigest(digest,"!!expire!!",10); /* We can finally xor the key-val digest to the final digest */ @@ -10633,7 +10871,7 @@ static void debugCommand(redisClient *c) { static void _redisAssert(char *estr, char *file, int line) { redisLog(REDIS_WARNING,"=== ASSERTION FAILED ==="); - redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true\n",file,line,estr); + redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true",file,line,estr); #ifdef HAVE_BACKTRACE redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)"); *((char*)-1) = 'x'; @@ -10698,7 +10936,8 @@ static void daemonize(void) { } static void version() { - printf("Redis server version %s\n", REDIS_VERSION); + printf("Redis server version %s (%s:%d)\n", REDIS_VERSION, + REDIS_GIT_SHA1, atoi(REDIS_GIT_DIRTY) > 0); exit(0); } @@ -10712,6 +10951,7 @@ int main(int argc, char **argv) { time_t start; initServerConfig(); + sortCommandTable(); if (argc == 2) { if (strcmp(argv[1], "-v") == 0 || strcmp(argv[1], "--version") == 0) version(); @@ -10812,6 +11052,13 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) { _exit(0); } +static void sigtermHandler(int sig) { + REDIS_NOTUSED(sig); + + redisLog(REDIS_WARNING,"SIGTERM received, scheduling shutting down..."); + server.shutdown_asap = 1; +} + static void setupSigSegvAction(void) { struct sigaction act; @@ -10825,6 +11072,10 @@ static void setupSigSegvAction(void) { sigaction (SIGFPE, &act, NULL); sigaction (SIGILL, &act, NULL); sigaction (SIGBUS, &act, NULL); + + act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND; + act.sa_handler = sigtermHandler; + sigaction (SIGTERM, &act, NULL); return; }