* POSSIBILITY OF SUCH DAMAGE.
*/
-#define REDIS_VERSION "1.3.10"
+#define REDIS_VERSION "2.1.1"
#include "fmacros.h"
#include "config.h"
#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
#include "zipmap.h" /* Compact dictionary-alike data structure */
#include "sha1.h" /* SHA1 is used for DEBUG DIGEST */
+#include "release.h" /* Release and/or git repository information */
/* Error codes */
#define REDIS_OK 0
#define REDIS_MULTI 8 /* This client is in a MULTI context */
#define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */
#define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */
+#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */
/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
- dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
+ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *io_keys; /* Keys with clients waiting for VM I/O */
+ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
multiState mstate; /* MULTI/EXEC state */
- robj **blockingkeys; /* The key we are waiting to terminate a blocking
+ robj **blocking_keys; /* The key we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
- int blockingkeysnum; /* Number of blocking keys */
+ int blocking_keys_num; /* Number of blocking keys */
time_t blockingto; /* Blocking operation timeout. If UNIX current time
* is >= blockingto then the operation timed out. */
list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */
+ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
} redisClient;
int daemonize;
int appendonly;
int appendfsync;
+ int shutdown_asap;
time_t lastfsync;
int appendfd;
int appendseldb;
static void usage();
static int rewriteAppendOnlyFileBackground(void);
static int vmSwapObjectBlocking(robj *key, robj *val);
+static int prepareForShutdown();
+static void touchWatchedKey(redisDb *db, robj *key);
+static void touchWatchedKeysOnFlush(int dbid);
+static void unwatchAllKeys(redisClient *c);
static void authCommand(redisClient *c);
static void pingCommand(redisClient *c);
static void psubscribeCommand(redisClient *c);
static void punsubscribeCommand(redisClient *c);
static void publishCommand(redisClient *c);
+static void watchCommand(redisClient *c);
+static void unwatchCommand(redisClient *c);
/*================================= Globals ================================= */
{"psubscribe",psubscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0},
{"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0},
+ {"watch",watchCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
+ {"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{NULL,NULL,0,0,NULL,0,0,0}
};
* To access a global var is faster than calling time(NULL) */
server.unixtime = time(NULL);
+ /* We received a SIGTERM, shutting down here in a safe way, as it is
+ * not ok doing so inside the signal handler. */
+ if (server.shutdown_asap) {
+ if (prepareForShutdown() == REDIS_OK) exit(0);
+ redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
+ }
+
/* Show some info about non-empty databases */
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
server.vm_blocked_clients = 0;
server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
+ server.shutdown_asap = 0;
resetServerSaveParams();
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
- server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
+ server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
+ server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
if (server.vm_enabled)
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
if (c->flags & REDIS_BLOCKED)
unblockClientWaitingData(c);
+ /* UNWATCH all the keys */
+ unwatchAllKeys(c);
+ listRelease(c->watched_keys);
/* Unsubscribe from all the pubsub channels */
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeAllPatterns(c,0);
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
- /* Remove from the list of clients waiting for swapped keys */
+ /* Remove from the list of clients that are now ready to be restarted
+ * after waiting for swapped keys */
if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) {
ln = listSearchKey(server.io_ready_clients,c);
if (ln) {
server.vm_blocked_clients--;
}
}
+ /* Remove from the list of clients waiting for swapped keys */
while (server.vm_enabled && listLength(c->io_keys)) {
ln = listFirst(c->io_keys);
dontWaitForSwappedKey(c,ln->value);
}
/* Exec the command */
- if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) {
+ if (c->flags & REDIS_MULTI &&
+ cmd->proc != execCommand && cmd->proc != discardCommand &&
+ cmd->proc != multiCommand && cmd->proc != watchCommand)
+ {
queueMultiCommand(c,cmd);
addReply(c,shared.queued);
} else {
c->reply = listCreate();
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
- c->blockingkeys = NULL;
- c->blockingkeysnum = 0;
+ c->blocking_keys = NULL;
+ c->blocking_keys_num = 0;
c->io_keys = listCreate();
+ c->watched_keys = listCreate();
listSetFreeMethod(c->io_keys,decrRefCount);
c->pubsub_channels = dictCreate(&setDictType,NULL);
c->pubsub_patterns = listCreate();
(unsigned long) strlen(buf),buf));
}
-static void addReplyLong(redisClient *c, long l) {
- char buf[128];
- size_t len;
-
- if (l == 0) {
- addReply(c,shared.czero);
- return;
- } else if (l == 1) {
- addReply(c,shared.cone);
- return;
- }
- len = snprintf(buf,sizeof(buf),":%ld\r\n",l);
- addReplySds(c,sdsnewlen(buf,len));
-}
-
static void addReplyLongLong(redisClient *c, long long ll) {
char buf[128];
size_t len;
addReply(c,shared.cone);
return;
}
- len = snprintf(buf,sizeof(buf),":%lld\r\n",ll);
- addReplySds(c,sdsnewlen(buf,len));
+ buf[0] = ':';
+ len = ll2string(buf+1,sizeof(buf)-1,ll);
+ buf[len+1] = '\r';
+ buf[len+2] = '\n';
+ addReplySds(c,sdsnewlen(buf,len+3));
}
static void addReplyUlong(redisClient *c, unsigned long ul) {
}
static void addReplyBulkLen(redisClient *c, robj *obj) {
- size_t len;
+ size_t len, intlen;
+ char buf[128];
if (obj->encoding == REDIS_ENCODING_RAW) {
len = sdslen(obj->ptr);
len++;
}
}
- addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
+ buf[0] = '$';
+ intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len);
+ buf[intlen+1] = '\r';
+ buf[intlen+2] = '\n';
+ addReplySds(c,sdsnewlen(buf,intlen+3));
}
static void addReplyBulk(redisClient *c, robj *obj) {
incrRefCount(shared.integers[value]);
o = shared.integers[value];
} else {
- o = createObject(REDIS_STRING, NULL);
if (value >= LONG_MIN && value <= LONG_MAX) {
+ o = createObject(REDIS_STRING, NULL);
o->encoding = REDIS_ENCODING_INT;
o->ptr = (void*)((long)value);
} else {
static robj *lookupKeyWrite(redisDb *db, robj *key) {
deleteIfVolatile(db,key);
+ touchWatchedKey(db,key);
return lookupKey(db,key);
}
return REDIS_ERR; /* Just to avoid warning */
}
+/*================================== Shutdown =============================== */
+static int prepareForShutdown() {
+ redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
+ /* Kill the saving child if there is a background saving in progress.
+ We want to avoid race conditions, for instance our saving child may
+ overwrite the synchronous saving did by SHUTDOWN. */
+ if (server.bgsavechildpid != -1) {
+ redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
+ kill(server.bgsavechildpid,SIGKILL);
+ rdbRemoveTempFile(server.bgsavechildpid);
+ }
+ if (server.appendonly) {
+ /* Append only file: fsync() the AOF and exit */
+ fsync(server.appendfd);
+ if (server.vm_enabled) unlink(server.vm_swap_file);
+ } else {
+ /* Snapshotting. Perform a SYNC SAVE and exit */
+ if (rdbSave(server.dbfilename) == REDIS_OK) {
+ if (server.daemonize)
+ unlink(server.pidfile);
+ redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
+ } else {
+ /* Ooops.. error saving! The best we can do is to continue
+ * operating. Note that if there was a background saving process,
+ * in the next cron() Redis will be notified that the background
+ * saving aborted, handling special stuff like slaves pending for
+ * synchronization... */
+ redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
+ return REDIS_ERR;
+ }
+ }
+ redisLog(REDIS_WARNING,"Server exit now, bye bye...");
+ return REDIS_OK;
+}
+
/*================================== Commands =============================== */
static void authCommand(redisClient *c) {
}
}
+ touchWatchedKey(c->db,key);
if (nx) deleteIfVolatile(c->db,key);
retval = dictAdd(c->db->dict,key,val);
if (retval == DICT_ERR) {
if (getLongLongFromObjectOrReply(c,o,&value,NULL) != REDIS_OK) return;
value += incr;
- o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
- o = tryObjectEncoding(o);
+ o = createStringObjectFromLongLong(value);
retval = dictAdd(c->db->dict,c->argv[1],o);
if (retval == DICT_ERR) {
dictReplace(c->db->dict,c->argv[1],o);
for (j = 1; j < c->argc; j++) {
if (deleteKey(c->db,c->argv[j])) {
+ touchWatchedKey(c->db,c->argv[j]);
server.dirty++;
deleted++;
}
}
- addReplyLong(c,deleted);
+ addReplyLongLong(c,deleted);
}
static void existsCommand(redisClient *c) {
}
static void shutdownCommand(redisClient *c) {
- redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
- /* Kill the saving child if there is a background saving in progress.
- We want to avoid race conditions, for instance our saving child may
- overwrite the synchronous saving did by SHUTDOWN. */
- if (server.bgsavechildpid != -1) {
- redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
- kill(server.bgsavechildpid,SIGKILL);
- rdbRemoveTempFile(server.bgsavechildpid);
- }
- if (server.appendonly) {
- /* Append only file: fsync() the AOF and exit */
- fsync(server.appendfd);
- if (server.vm_enabled) unlink(server.vm_swap_file);
+ if (prepareForShutdown() == REDIS_OK)
exit(0);
- } else {
- /* Snapshotting. Perform a SYNC SAVE and exit */
- if (rdbSave(server.dbfilename) == REDIS_OK) {
- if (server.daemonize)
- unlink(server.pidfile);
- redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
- redisLog(REDIS_WARNING,"Server exit now, bye bye...");
- exit(0);
- } else {
- /* Ooops.. error saving! The best we can do is to continue
- * operating. Note that if there was a background saving process,
- * in the next cron() Redis will be notified that the background
- * saving aborted, handling special stuff like slaves pending for
- * synchronization... */
- redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
- addReplySds(c,
- sdsnew("-ERR can't quit, problems saving the DB\r\n"));
- }
- }
+ addReplySds(c, sdsnew("-ERR Errors trying to SHUTDOWN. Check logs.\r\n"));
}
static void renameGenericCommand(redisClient *c, int nx) {
incrRefCount(c->argv[2]);
}
deleteKey(c->db,c->argv[1]);
+ touchWatchedKey(c->db,c->argv[2]);
server.dirty++;
addReply(c,nx ? shared.cone : shared.ok);
}
incrRefCount(c->argv[2]);
}
server.dirty++;
- addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(list)));
+ addReplyLongLong(c,listLength(list));
}
static void lpushCommand(redisClient *c) {
if (dictSize((dict*)dstset->ptr) > 0) {
dictAdd(c->db->dict,dstkey,dstset);
incrRefCount(dstkey);
- addReplyLong(c,dictSize((dict*)dstset->ptr));
+ addReplyLongLong(c,dictSize((dict*)dstset->ptr));
} else {
decrRefCount(dstset);
addReply(c,shared.czero);
if (dictSize((dict*)dstset->ptr) > 0) {
dictAdd(c->db->dict,dstkey,dstset);
incrRefCount(dstkey);
- addReplyLong(c,dictSize((dict*)dstset->ptr));
+ addReplyLongLong(c,dictSize((dict*)dstset->ptr));
} else {
decrRefCount(dstset);
addReply(c,shared.czero);
zskiplistNode *zn = zmalloc(sizeof(*zn));
zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
- if (level > 0)
+ if (level > 1)
zn->span = zmalloc(sizeof(unsigned int) * (level - 1));
+ else
+ zn->span = NULL;
zn->score = score;
zn->obj = obj;
return zn;
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]);
server.dirty += deleted;
- addReplyLong(c,deleted);
+ addReplyLongLong(c,deleted);
}
static void zremrangebyrankCommand(redisClient *c) {
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]);
server.dirty += deleted;
- addReplyLong(c, deleted);
+ addReplyLongLong(c, deleted);
}
typedef struct {
#define REDIS_AGGR_SUM 1
#define REDIS_AGGR_MIN 2
#define REDIS_AGGR_MAX 3
+#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
inline static void zunionInterAggregate(double *target, double val, int aggregate) {
if (aggregate == REDIS_AGGR_SUM) {
}
static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
- int i, j, zsetnum;
+ int i, j, setnum;
int aggregate = REDIS_AGGR_SUM;
zsetopsrc *src;
robj *dstobj;
dictIterator *di;
dictEntry *de;
- /* expect zsetnum input keys to be given */
- zsetnum = atoi(c->argv[2]->ptr);
- if (zsetnum < 1) {
+ /* expect setnum input keys to be given */
+ setnum = atoi(c->argv[2]->ptr);
+ if (setnum < 1) {
addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n"));
return;
}
/* test if the expected number of keys would overflow */
- if (3+zsetnum > c->argc) {
+ if (3+setnum > c->argc) {
addReply(c,shared.syntaxerr);
return;
}
/* read keys to be used for input */
- src = zmalloc(sizeof(zsetopsrc) * zsetnum);
- for (i = 0, j = 3; i < zsetnum; i++, j++) {
- robj *zsetobj = lookupKeyWrite(c->db,c->argv[j]);
- if (!zsetobj) {
+ src = zmalloc(sizeof(zsetopsrc) * setnum);
+ for (i = 0, j = 3; i < setnum; i++, j++) {
+ robj *obj = lookupKeyWrite(c->db,c->argv[j]);
+ if (!obj) {
src[i].dict = NULL;
} else {
- if (zsetobj->type != REDIS_ZSET) {
+ if (obj->type == REDIS_ZSET) {
+ src[i].dict = ((zset*)obj->ptr)->dict;
+ } else if (obj->type == REDIS_SET) {
+ src[i].dict = (obj->ptr);
+ } else {
zfree(src);
addReply(c,shared.wrongtypeerr);
return;
}
- src[i].dict = ((zset*)zsetobj->ptr)->dict;
}
/* default all weights to 1 */
int remaining = c->argc - j;
while (remaining) {
- if (remaining >= (zsetnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
+ if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
j++; remaining--;
- for (i = 0; i < zsetnum; i++, j++, remaining--) {
+ for (i = 0; i < setnum; i++, j++, remaining--) {
if (getDoubleFromObjectOrReply(c, c->argv[j], &src[i].weight, NULL) != REDIS_OK)
return;
}
/* sort sets from the smallest to largest, this will improve our
* algorithm's performance */
- qsort(src,zsetnum,sizeof(zsetopsrc), qsortCompareZsetopsrcByCardinality);
+ qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality);
dstobj = createZsetObject();
dstzset = dstobj->ptr;
di = dictGetIterator(src[0].dict);
while((de = dictNext(di)) != NULL) {
double *score = zmalloc(sizeof(double)), value;
- *score = src[0].weight * (*(double*)dictGetEntryVal(de));
+ *score = src[0].weight * zunionInterDictValue(de);
- for (j = 1; j < zsetnum; j++) {
+ for (j = 1; j < setnum; j++) {
dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
if (other) {
- value = src[j].weight * (*(double*)dictGetEntryVal(other));
+ value = src[j].weight * zunionInterDictValue(other);
zunionInterAggregate(score, value, aggregate);
} else {
break;
}
/* skip entry when not present in every source dict */
- if (j != zsetnum) {
+ if (j != setnum) {
zfree(score);
} else {
robj *o = dictGetEntryKey(de);
dictReleaseIterator(di);
}
} else if (op == REDIS_OP_UNION) {
- for (i = 0; i < zsetnum; i++) {
+ for (i = 0; i < setnum; i++) {
if (!src[i].dict) continue;
di = dictGetIterator(src[i].dict);
if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue;
double *score = zmalloc(sizeof(double)), value;
- *score = src[i].weight * (*(double*)dictGetEntryVal(de));
+ *score = src[i].weight * zunionInterDictValue(de);
/* because the zsets are sorted by size, its only possible
* for sets at larger indices to hold this entry */
- for (j = (i+1); j < zsetnum; j++) {
+ for (j = (i+1); j < setnum; j++) {
dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
if (other) {
- value = src[j].weight * (*(double*)dictGetEntryVal(other));
+ value = src[j].weight * zunionInterDictValue(other);
zunionInterAggregate(score, value, aggregate);
}
}
if (dstzset->zsl->length) {
dictAdd(c->db->dict,dstkey,dstobj);
incrRefCount(dstkey);
- addReplyLong(c, dstzset->zsl->length);
+ addReplyLongLong(c, dstzset->zsl->length);
server.dirty++;
} else {
decrRefCount(dstobj);
if (limit > 0) limit--;
}
if (justcount) {
- addReplyLong(c,(long)rangelen);
+ addReplyLongLong(c,(long)rangelen);
} else {
lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",
withscores ? (rangelen*2) : rangelen);
rank = zslGetRank(zsl, *score, c->argv[2]);
if (rank) {
if (reverse) {
- addReplyLong(c, zsl->length - rank);
+ addReplyLongLong(c, zsl->length - rank);
} else {
- addReplyLong(c, rank-1);
+ addReplyLongLong(c, rank-1);
}
} else {
addReply(c,shared.nullbulk);
static void flushdbCommand(redisClient *c) {
server.dirty += dictSize(c->db->dict);
+ touchWatchedKeysOnFlush(c->db->id);
dictEmpty(c->db->dict);
dictEmpty(c->db->expires);
addReply(c,shared.ok);
}
static void flushallCommand(redisClient *c) {
+ touchWatchedKeysOnFlush(-1);
server.dirty += emptyDb();
addReply(c,shared.ok);
if (server.bgsavechildpid != -1) {
bytesToHuman(hmem,zmalloc_used_memory());
info = sdscatprintf(sdsempty(),
"redis_version:%s\r\n"
+ "redis_git_sha1:%s\r\n"
+ "redis_git_dirty:%d\r\n"
"arch_bits:%s\r\n"
"multiplexing_api:%s\r\n"
"process_id:%ld\r\n"
"vm_enabled:%d\r\n"
"role:%s\r\n"
,REDIS_VERSION,
+ REDIS_GIT_SHA1,
+ strtol(REDIS_GIT_DIRTY,NULL,10) > 0,
(sizeof(long) == 8) ? "64" : "32",
aeGetApiName(),
(long) getpid(),
}
static void multiCommand(redisClient *c) {
+ if (c->flags & REDIS_MULTI) {
+ addReplySds(c,sdsnew("-ERR MULTI calls can not be nested\r\n"));
+ return;
+ }
c->flags |= REDIS_MULTI;
addReply(c,shared.ok);
}
return;
}
+ /* Check if we need to abort the EXEC if some WATCHed key was touched.
+ * A failed EXEC will return a multi bulk nil object. */
+ if (c->flags & REDIS_DIRTY_CAS) {
+ freeClientMultiState(c);
+ initClientMultiState(c);
+ c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
+ unwatchAllKeys(c);
+ addReply(c,shared.nullmultibulk);
+ return;
+ }
+
/* Replicate a MULTI request now that we are sure the block is executed.
* This way we'll deliver the MULTI/..../EXEC block as a whole and
* both the AOF and the replication link will have the same consistency
execCommandReplicateMulti(c);
/* Exec all the queued commands */
+ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv;
orig_argc = c->argc;
addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
c->argc = orig_argc;
freeClientMultiState(c);
initClientMultiState(c);
- c->flags &= (~REDIS_MULTI);
+ c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
/* Make sure the EXEC command is always replicated / AOF, since we
* always send the MULTI command (we can't know beforehand if the
* next operations will contain at least a modification to the DB). */
* empty we need to block. In order to do so we remove the notification for
* new data to read in the client socket (so that we'll not serve new
* requests if the blocking request is not served). Also we put the client
- * in a dictionary (db->blockingkeys) mapping keys to a list of clients
+ * in a dictionary (db->blocking_keys) mapping keys to a list of clients
* blocking for this keys.
* - If a PUSH operation against a key with blocked clients waiting is
* performed, we serve the first in the list: basically instead to push
list *l;
int j;
- c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
- c->blockingkeysnum = numkeys;
+ c->blocking_keys = zmalloc(sizeof(robj*)*numkeys);
+ c->blocking_keys_num = numkeys;
c->blockingto = timeout;
for (j = 0; j < numkeys; j++) {
/* Add the key in the client structure, to map clients -> keys */
- c->blockingkeys[j] = keys[j];
+ c->blocking_keys[j] = keys[j];
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
- de = dictFind(c->db->blockingkeys,keys[j]);
+ de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
- retval = dictAdd(c->db->blockingkeys,keys[j],l);
+ retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
assert(retval == DICT_OK);
} else {
list *l;
int j;
- assert(c->blockingkeys != NULL);
+ assert(c->blocking_keys != NULL);
/* The client may wait for multiple keys, so unblock it for every key. */
- for (j = 0; j < c->blockingkeysnum; j++) {
+ for (j = 0; j < c->blocking_keys_num; j++) {
/* Remove this client from the list of clients waiting for this key. */
- de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
+ de = dictFind(c->db->blocking_keys,c->blocking_keys[j]);
assert(de != NULL);
l = dictGetEntryVal(de);
listDelNode(l,listSearchKey(l,c));
/* If the list is empty we need to remove it to avoid wasting memory */
if (listLength(l) == 0)
- dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
- decrRefCount(c->blockingkeys[j]);
+ dictDelete(c->db->blocking_keys,c->blocking_keys[j]);
+ decrRefCount(c->blocking_keys[j]);
}
/* Cleanup the client structure */
- zfree(c->blockingkeys);
- c->blockingkeys = NULL;
+ zfree(c->blocking_keys);
+ c->blocking_keys = NULL;
c->flags &= (~REDIS_BLOCKED);
server.blpop_blocked_clients--;
/* We want to process data if there is some command waiting
list *l;
listNode *ln;
- de = dictFind(c->db->blockingkeys,key);
+ de = dictFind(c->db->blocking_keys,key);
if (de == NULL) return 0;
l = dictGetEntryVal(de);
ln = listFirst(l);
* as a fully non-blocking VM.
*/
+/* Called when the user switches from "appendonly yes" to "appendonly no"
+ * at runtime using the CONFIG command. */
+static void stopAppendOnly(void) {
+ flushAppendOnlyFile();
+ fsync(server.appendfd);
+ close(server.appendfd);
+
+ server.appendfd = -1;
+ server.appendseldb = -1;
+ server.appendonly = 0;
+ /* rewrite operation in progress? kill it, wait child exit */
+ if (server.bgsavechildpid != -1) {
+ int statloc;
+
+ if (kill(server.bgsavechildpid,SIGKILL) != -1)
+ wait3(&statloc,0,NULL);
+ /* reset the buffer accumulating changes while the child saves */
+ sdsfree(server.bgrewritebuf);
+ server.bgrewritebuf = sdsempty();
+ server.bgsavechildpid = -1;
+ }
+}
+
+/* Called when the user switches from "appendonly no" to "appendonly yes"
+ * at runtime using the CONFIG command. */
+static int startAppendOnly(void) {
+ server.appendonly = 1;
+ server.lastfsync = time(NULL);
+ server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
+ if (server.appendfd == -1) {
+ redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno));
+ return REDIS_ERR;
+ }
+ if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {
+ server.appendonly = 0;
+ close(server.appendfd);
+ redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno));
+ return REDIS_ERR;
+ }
+ return REDIS_OK;
+}
+
/* =================== Virtual Memory - Blocking Side ====================== */
static void vmInit(void) {
static void configSetCommand(redisClient *c) {
robj *o = getDecodedObject(c->argv[3]);
+ long long ll;
+
if (!strcasecmp(c->argv[2]->ptr,"dbfilename")) {
zfree(server.dbfilename);
server.dbfilename = zstrdup(o->ptr);
zfree(server.masterauth);
server.masterauth = zstrdup(o->ptr);
} else if (!strcasecmp(c->argv[2]->ptr,"maxmemory")) {
- server.maxmemory = strtoll(o->ptr, NULL, 10);
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
+ ll < 0) goto badfmt;
+ server.maxmemory = ll;
+ } else if (!strcasecmp(c->argv[2]->ptr,"timeout")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
+ ll < 0 || ll > LONG_MAX) goto badfmt;
+ server.maxidletime = ll;
} else if (!strcasecmp(c->argv[2]->ptr,"appendfsync")) {
if (!strcasecmp(o->ptr,"no")) {
server.appendfsync = APPENDFSYNC_NO;
} else {
goto badfmt;
}
+ } else if (!strcasecmp(c->argv[2]->ptr,"appendonly")) {
+ int old = server.appendonly;
+ int new = yesnotoi(o->ptr);
+
+ if (new == -1) goto badfmt;
+ if (old != new) {
+ if (new == 0) {
+ stopAppendOnly();
+ } else {
+ if (startAppendOnly() == REDIS_ERR) {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR Unable to turn on AOF. Check server logs.\r\n"));
+ decrRefCount(o);
+ return;
+ }
+ }
+ }
} else if (!strcasecmp(c->argv[2]->ptr,"save")) {
int vlen, j;
sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen);
if (stringmatch(pattern,"maxmemory",0)) {
char buf[128];
- snprintf(buf,128,"%llu\n",server.maxmemory);
+ ll2string(buf,128,server.maxmemory);
addReplyBulkCString(c,"maxmemory");
addReplyBulkCString(c,buf);
matches++;
}
+ if (stringmatch(pattern,"timeout",0)) {
+ char buf[128];
+
+ ll2string(buf,128,server.maxidletime);
+ addReplyBulkCString(c,"timeout");
+ addReplyBulkCString(c,buf);
+ matches++;
+ }
+ if (stringmatch(pattern,"appendonly",0)) {
+ addReplyBulkCString(c,"appendonly");
+ addReplyBulkCString(c,server.appendonly ? "yes" : "no");
+ matches++;
+ }
if (stringmatch(pattern,"appendfsync",0)) {
char *policy;
addReply(c,shared.mbulk3);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
- addReplyLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
+ addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
return retval;
}
addReply(c,shared.mbulk3);
addReply(c,shared.unsubscribebulk);
addReplyBulk(c,channel);
- addReplyLong(c,dictSize(c->pubsub_channels)+
+ addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
addReply(c,shared.mbulk3);
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
- addReplyLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
+ addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
return retval;
}
addReply(c,shared.mbulk3);
addReply(c,shared.punsubscribebulk);
addReplyBulk(c,pattern);
- addReplyLong(c,dictSize(c->pubsub_channels)+
+ addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
decrRefCount(pattern);
static void publishCommand(redisClient *c) {
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
- addReplyLong(c,receivers);
+ addReplyLongLong(c,receivers);
+}
+
+/* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
+ *
+ * The implementation uses a per-DB hash table mapping keys to list of clients
+ * WATCHing those keys, so that given a key that is going to be modified
+ * we can mark all the associated clients as dirty.
+ *
+ * Also every client contains a list of WATCHed keys so that's possible to
+ * un-watch such keys when the client is freed or when UNWATCH is called. */
+
+/* In the client->watched_keys list we need to use watchedKey structures
+ * as in order to identify a key in Redis we need both the key name and the
+ * DB */
+typedef struct watchedKey {
+ robj *key;
+ redisDb *db;
+} watchedKey;
+
+/* Watch for the specified key */
+static void watchForKey(redisClient *c, robj *key) {
+ list *clients = NULL;
+ listIter li;
+ listNode *ln;
+ watchedKey *wk;
+
+ /* Check if we are already watching for this key */
+ listRewind(c->watched_keys,&li);
+ while((ln = listNext(&li))) {
+ wk = listNodeValue(ln);
+ if (wk->db == c->db && equalStringObjects(key,wk->key))
+ return; /* Key already watched */
+ }
+ /* This key is not already watched in this DB. Let's add it */
+ clients = dictFetchValue(c->db->watched_keys,key);
+ if (!clients) {
+ clients = listCreate();
+ dictAdd(c->db->watched_keys,key,clients);
+ incrRefCount(key);
+ }
+ listAddNodeTail(clients,c);
+ /* Add the new key to the lits of keys watched by this client */
+ wk = zmalloc(sizeof(*wk));
+ wk->key = key;
+ wk->db = c->db;
+ incrRefCount(key);
+ listAddNodeTail(c->watched_keys,wk);
+}
+
+/* Unwatch all the keys watched by this client. To clean the EXEC dirty
+ * flag is up to the caller. */
+static void unwatchAllKeys(redisClient *c) {
+ listIter li;
+ listNode *ln;
+
+ if (listLength(c->watched_keys) == 0) return;
+ listRewind(c->watched_keys,&li);
+ while((ln = listNext(&li))) {
+ list *clients;
+ watchedKey *wk;
+
+ /* Lookup the watched key -> clients list and remove the client
+ * from the list */
+ wk = listNodeValue(ln);
+ clients = dictFetchValue(wk->db->watched_keys, wk->key);
+ assert(clients != NULL);
+ listDelNode(clients,listSearchKey(clients,c));
+ /* Kill the entry at all if this was the only client */
+ if (listLength(clients) == 0)
+ dictDelete(wk->db->watched_keys, wk->key);
+ /* Remove this watched key from the client->watched list */
+ listDelNode(c->watched_keys,ln);
+ decrRefCount(wk->key);
+ zfree(wk);
+ }
+}
+
+/* "Touch" a key, so that if this key is being WATCHed by some client the
+ * next EXEC will fail. */
+static void touchWatchedKey(redisDb *db, robj *key) {
+ list *clients;
+ listIter li;
+ listNode *ln;
+
+ if (dictSize(db->watched_keys) == 0) return;
+ clients = dictFetchValue(db->watched_keys, key);
+ if (!clients) return;
+
+ /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
+ /* Check if we are already watching for this key */
+ listRewind(clients,&li);
+ while((ln = listNext(&li))) {
+ redisClient *c = listNodeValue(ln);
+
+ c->flags |= REDIS_DIRTY_CAS;
+ }
+}
+
+/* On FLUSHDB or FLUSHALL all the watched keys that are present before the
+ * flush but will be deleted as effect of the flushing operation should
+ * be touched. "dbid" is the DB that's getting the flush. -1 if it is
+ * a FLUSHALL operation (all the DBs flushed). */
+static void touchWatchedKeysOnFlush(int dbid) {
+ listIter li1, li2;
+ listNode *ln;
+
+ /* For every client, check all the waited keys */
+ listRewind(server.clients,&li1);
+ while((ln = listNext(&li1))) {
+ redisClient *c = listNodeValue(ln);
+ listRewind(c->watched_keys,&li2);
+ while((ln = listNext(&li2))) {
+ watchedKey *wk = listNodeValue(ln);
+
+ /* For every watched key matching the specified DB, if the
+ * key exists, mark the client as dirty, as the key will be
+ * removed. */
+ if (dbid == -1 || wk->db->id == dbid) {
+ if (dictFind(wk->db->dict, wk->key) != NULL)
+ c->flags |= REDIS_DIRTY_CAS;
+ }
+ }
+ }
+}
+
+static void watchCommand(redisClient *c) {
+ int j;
+
+ if (c->flags & REDIS_MULTI) {
+ addReplySds(c,sdsnew("-ERR WATCH inside MULTI is not allowed\r\n"));
+ return;
+ }
+ for (j = 1; j < c->argc; j++)
+ watchForKey(c,c->argv[j]);
+ addReply(c,shared.ok);
+}
+
+static void unwatchCommand(redisClient *c) {
+ unwatchAllKeys(c);
+ c->flags &= (~REDIS_DIRTY_CAS);
+ addReply(c,shared.ok);
}
/* ================================= Debugging ============================== */
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
- robj *key, *o;
+ robj *key, *o, *kcopy;
time_t expiretime;
memset(digest,0,20); /* This key-val digest */
key = dictGetEntryKey(de);
- mixObjectDigest(digest,key);
- if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY ||
- key->storage == REDIS_VM_SWAPPING) {
+
+ if (!server.vm_enabled) {
+ mixObjectDigest(digest,key);
o = dictGetEntryVal(de);
- incrRefCount(o);
} else {
- o = vmPreviewObject(key);
+ /* Don't work with the key directly as when VM is active
+ * this is unsafe: TODO: fix decrRefCount to check if the
+ * count really reached 0 to avoid this mess */
+ kcopy = dupStringObject(key);
+ mixObjectDigest(digest,kcopy);
+ o = lookupKeyRead(db,kcopy);
+ decrRefCount(kcopy);
}
aux = htonl(o->type);
mixDigest(digest,&aux,sizeof(aux));
} else {
redisPanic("Unknown object type");
}
- decrRefCount(o);
/* If the key has an expire, add it to the mix */
if (expiretime != -1) xorDigest(digest,"!!expire!!",10);
/* We can finally xor the key-val digest to the final digest */
static void _redisAssert(char *estr, char *file, int line) {
redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
- redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true\n",file,line,estr);
+ redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true",file,line,estr);
#ifdef HAVE_BACKTRACE
redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
*((char*)-1) = 'x';
_exit(0);
}
+static void sigtermHandler(int sig) {
+ REDIS_NOTUSED(sig);
+
+ redisLog(REDIS_WARNING,"SIGTERM received, scheduling shutting down...");
+ server.shutdown_asap = 1;
+}
+
static void setupSigSegvAction(void) {
struct sigaction act;
sigaction (SIGFPE, &act, NULL);
sigaction (SIGILL, &act, NULL);
sigaction (SIGBUS, &act, NULL);
+
+ act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND;
+ act.sa_handler = sigtermHandler;
+ sigaction (SIGTERM, &act, NULL);
return;
}