+static void vmReopenSwapFile(void) {
+ /* Note: we don't close the old one as we are in the child process
+ * and don't want to mess at all with the original file object. */
+ server.vm_fp = fopen(server.vm_swap_file,"r+b");
+ if (server.vm_fp == NULL) {
+ redisLog(REDIS_WARNING,"Can't re-open the VM swap file: %s. Exiting.",
+ server.vm_swap_file);
+ _exit(1);
+ }
+ server.vm_fd = fileno(server.vm_fp);
+}
+
+/* This function must be called while with threaded IO locked */
+static void queueIOJob(iojob *j) {
+ redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
+ (void*)j, j->type, (char*)j->key->ptr);
+ listAddNodeTail(server.io_newjobs,j);
+ if (server.io_active_threads < server.vm_max_threads)
+ spawnIOThread();
+}
+
+static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
+ iojob *j;
+
+ j = zmalloc(sizeof(*j));
+ j->type = REDIS_IOJOB_PREPARE_SWAP;
+ j->db = db;
+ j->key = key;
+ incrRefCount(key);
+ j->id = j->val = val;
+ incrRefCount(val);
+ j->canceled = 0;
+ j->thread = (pthread_t) -1;
+ val->storage = REDIS_VM_SWAPPING;
+
+ lockThreadedIO();
+ queueIOJob(j);
+ unlockThreadedIO();
+ return REDIS_OK;
+}
+
+/* ============ Virtual Memory - Blocking clients on missing keys =========== */
+
+/* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
+ * If there is not already a job loading the key, it is craeted.
+ * The key is added to the io_keys list in the client structure, and also
+ * in the hash table mapping swapped keys to waiting clients, that is,
+ * server.io_waited_keys. */
+static int waitForSwappedKey(redisClient *c, robj *key) {
+ struct dictEntry *de;
+ robj *o;
+ list *l;
+
+ /* If the key does not exist or is already in RAM we don't need to
+ * block the client at all. */
+ de = dictFind(c->db->dict,key->ptr);
+ if (de == NULL) return 0;
+ o = dictGetEntryVal(de);
+ if (o->storage == REDIS_VM_MEMORY) {
+ return 0;
+ } else if (o->storage == REDIS_VM_SWAPPING) {
+ /* We were swapping the key, undo it! */
+ vmCancelThreadedIOJob(o);
+ return 0;
+ }
+
+ /* OK: the key is either swapped, or being loaded just now. */
+
+ /* Add the key to the list of keys this client is waiting for.
+ * This maps clients to keys they are waiting for. */
+ listAddNodeTail(c->io_keys,key);
+ incrRefCount(key);
+
+ /* Add the client to the swapped keys => clients waiting map. */
+ de = dictFind(c->db->io_keys,key);
+ if (de == NULL) {
+ int retval;
+
+ /* For every key we take a list of clients blocked for it */
+ l = listCreate();
+ retval = dictAdd(c->db->io_keys,key,l);
+ incrRefCount(key);
+ assert(retval == DICT_OK);
+ } else {
+ l = dictGetEntryVal(de);
+ }
+ listAddNodeTail(l,c);
+
+ /* Are we already loading the key from disk? If not create a job */
+ if (o->storage == REDIS_VM_SWAPPED) {
+ iojob *j;
+ vmpointer *vp = (vmpointer*)o;
+
+ o->storage = REDIS_VM_LOADING;
+ j = zmalloc(sizeof(*j));
+ j->type = REDIS_IOJOB_LOAD;
+ j->db = c->db;
+ j->id = (robj*)vp;
+ j->key = key;
+ incrRefCount(key);
+ j->page = vp->page;
+ j->val = NULL;
+ j->canceled = 0;
+ j->thread = (pthread_t) -1;
+ lockThreadedIO();
+ queueIOJob(j);
+ unlockThreadedIO();
+ }
+ return 1;
+}
+
+/* Preload keys for any command with first, last and step values for
+ * the command keys prototype, as defined in the command table. */
+static void waitForMultipleSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) {
+ int j, last;
+ if (cmd->vm_firstkey == 0) return;
+ last = cmd->vm_lastkey;
+ if (last < 0) last = argc+last;
+ for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) {
+ redisAssert(j < argc);
+ waitForSwappedKey(c,argv[j]);
+ }
+}
+
+/* Preload keys needed for the ZUNIONSTORE and ZINTERSTORE commands.
+ * Note that the number of keys to preload is user-defined, so we need to
+ * apply a sanity check against argc. */
+static void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) {
+ int i, num;
+ REDIS_NOTUSED(cmd);
+
+ num = atoi(argv[2]->ptr);
+ if (num > (argc-3)) return;
+ for (i = 0; i < num; i++) {
+ waitForSwappedKey(c,argv[3+i]);
+ }
+}
+
+/* Preload keys needed to execute the entire MULTI/EXEC block.
+ *
+ * This function is called by blockClientOnSwappedKeys when EXEC is issued,
+ * and will block the client when any command requires a swapped out value. */
+static void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) {
+ int i, margc;
+ struct redisCommand *mcmd;
+ robj **margv;
+ REDIS_NOTUSED(cmd);
+ REDIS_NOTUSED(argc);
+ REDIS_NOTUSED(argv);
+
+ if (!(c->flags & REDIS_MULTI)) return;
+ for (i = 0; i < c->mstate.count; i++) {
+ mcmd = c->mstate.commands[i].cmd;
+ margc = c->mstate.commands[i].argc;
+ margv = c->mstate.commands[i].argv;
+
+ if (mcmd->vm_preload_proc != NULL) {
+ mcmd->vm_preload_proc(c,mcmd,margc,margv);
+ } else {
+ waitForMultipleSwappedKeys(c,mcmd,margc,margv);
+ }
+ }
+}
+
+/* Is this client attempting to run a command against swapped keys?
+ * If so, block it ASAP, load the keys in background, then resume it.
+ *
+ * The important idea about this function is that it can fail! If keys will
+ * still be swapped when the client is resumed, this key lookups will
+ * just block loading keys from disk. In practical terms this should only
+ * happen with SORT BY command or if there is a bug in this function.
+ *
+ * Return 1 if the client is marked as blocked, 0 if the client can
+ * continue as the keys it is going to access appear to be in memory. */
+static int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) {
+ if (cmd->vm_preload_proc != NULL) {
+ cmd->vm_preload_proc(c,cmd,c->argc,c->argv);
+ } else {
+ waitForMultipleSwappedKeys(c,cmd,c->argc,c->argv);
+ }
+
+ /* If the client was blocked for at least one key, mark it as blocked. */
+ if (listLength(c->io_keys)) {
+ c->flags |= REDIS_IO_WAIT;
+ aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
+ server.vm_blocked_clients++;
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+/* Remove the 'key' from the list of blocked keys for a given client.
+ *
+ * The function returns 1 when there are no longer blocking keys after
+ * the current one was removed (and the client can be unblocked). */
+static int dontWaitForSwappedKey(redisClient *c, robj *key) {
+ list *l;
+ listNode *ln;
+ listIter li;
+ struct dictEntry *de;
+
+ /* Remove the key from the list of keys this client is waiting for. */
+ listRewind(c->io_keys,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ if (equalStringObjects(ln->value,key)) {
+ listDelNode(c->io_keys,ln);
+ break;
+ }
+ }
+ assert(ln != NULL);
+
+ /* Remove the client form the key => waiting clients map. */
+ de = dictFind(c->db->io_keys,key);
+ assert(de != NULL);
+ l = dictGetEntryVal(de);
+ ln = listSearchKey(l,c);
+ assert(ln != NULL);
+ listDelNode(l,ln);
+ if (listLength(l) == 0)
+ dictDelete(c->db->io_keys,key);
+
+ return listLength(c->io_keys) == 0;
+}
+
+/* Every time we now a key was loaded back in memory, we handle clients
+ * waiting for this key if any. */
+static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) {
+ struct dictEntry *de;
+ list *l;
+ listNode *ln;
+ int len;
+
+ de = dictFind(db->io_keys,key);
+ if (!de) return;
+
+ l = dictGetEntryVal(de);
+ len = listLength(l);
+ /* Note: we can't use something like while(listLength(l)) as the list
+ * can be freed by the calling function when we remove the last element. */
+ while (len--) {
+ ln = listFirst(l);
+ redisClient *c = ln->value;
+
+ if (dontWaitForSwappedKey(c,key)) {
+ /* Put the client in the list of clients ready to go as we
+ * loaded all the keys about it. */
+ listAddNodeTail(server.io_ready_clients,c);
+ }
+ }
+}
+
+/* =========================== Remote Configuration ========================= */
+
+static void configSetCommand(redisClient *c) {
+ robj *o = getDecodedObject(c->argv[3]);
+ long long ll;
+
+ if (!strcasecmp(c->argv[2]->ptr,"dbfilename")) {
+ zfree(server.dbfilename);
+ server.dbfilename = zstrdup(o->ptr);
+ } else if (!strcasecmp(c->argv[2]->ptr,"requirepass")) {
+ zfree(server.requirepass);
+ server.requirepass = zstrdup(o->ptr);
+ } else if (!strcasecmp(c->argv[2]->ptr,"masterauth")) {
+ zfree(server.masterauth);
+ server.masterauth = zstrdup(o->ptr);
+ } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
+ ll < 0) goto badfmt;
+ server.maxmemory = ll;
+ } else if (!strcasecmp(c->argv[2]->ptr,"timeout")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
+ ll < 0 || ll > LONG_MAX) goto badfmt;
+ server.maxidletime = ll;
+ } else if (!strcasecmp(c->argv[2]->ptr,"appendfsync")) {
+ if (!strcasecmp(o->ptr,"no")) {
+ server.appendfsync = APPENDFSYNC_NO;
+ } else if (!strcasecmp(o->ptr,"everysec")) {
+ server.appendfsync = APPENDFSYNC_EVERYSEC;
+ } else if (!strcasecmp(o->ptr,"always")) {
+ server.appendfsync = APPENDFSYNC_ALWAYS;
+ } else {
+ goto badfmt;
+ }
+ } else if (!strcasecmp(c->argv[2]->ptr,"no-appendfsync-on-rewrite")) {
+ int yn = yesnotoi(o->ptr);
+
+ if (yn == -1) goto badfmt;
+ server.no_appendfsync_on_rewrite = yn;
+ } else if (!strcasecmp(c->argv[2]->ptr,"appendonly")) {
+ int old = server.appendonly;
+ int new = yesnotoi(o->ptr);
+
+ if (new == -1) goto badfmt;
+ if (old != new) {
+ if (new == 0) {
+ stopAppendOnly();
+ } else {
+ if (startAppendOnly() == REDIS_ERR) {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR Unable to turn on AOF. Check server logs.\r\n"));
+ decrRefCount(o);
+ return;
+ }
+ }
+ }
+ } else if (!strcasecmp(c->argv[2]->ptr,"save")) {
+ int vlen, j;
+ sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen);
+
+ /* Perform sanity check before setting the new config:
+ * - Even number of args
+ * - Seconds >= 1, changes >= 0 */
+ if (vlen & 1) {
+ sdsfreesplitres(v,vlen);
+ goto badfmt;
+ }
+ for (j = 0; j < vlen; j++) {
+ char *eptr;
+ long val;
+
+ val = strtoll(v[j], &eptr, 10);
+ if (eptr[0] != '\0' ||
+ ((j & 1) == 0 && val < 1) ||
+ ((j & 1) == 1 && val < 0)) {
+ sdsfreesplitres(v,vlen);
+ goto badfmt;
+ }
+ }
+ /* Finally set the new config */
+ resetServerSaveParams();
+ for (j = 0; j < vlen; j += 2) {
+ time_t seconds;
+ int changes;
+
+ seconds = strtoll(v[j],NULL,10);
+ changes = strtoll(v[j+1],NULL,10);
+ appendServerSaveParams(seconds, changes);
+ }
+ sdsfreesplitres(v,vlen);
+ } else {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR not supported CONFIG parameter %s\r\n",
+ (char*)c->argv[2]->ptr));
+ decrRefCount(o);
+ return;
+ }
+ decrRefCount(o);
+ addReply(c,shared.ok);
+ return;
+
+badfmt: /* Bad format errors */
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR invalid argument '%s' for CONFIG SET '%s'\r\n",
+ (char*)o->ptr,
+ (char*)c->argv[2]->ptr));
+ decrRefCount(o);
+}
+
+static void configGetCommand(redisClient *c) {
+ robj *o = getDecodedObject(c->argv[2]);
+ robj *lenobj = createObject(REDIS_STRING,NULL);
+ char *pattern = o->ptr;
+ int matches = 0;
+
+ addReply(c,lenobj);
+ decrRefCount(lenobj);
+
+ if (stringmatch(pattern,"dbfilename",0)) {
+ addReplyBulkCString(c,"dbfilename");
+ addReplyBulkCString(c,server.dbfilename);
+ matches++;
+ }
+ if (stringmatch(pattern,"requirepass",0)) {
+ addReplyBulkCString(c,"requirepass");
+ addReplyBulkCString(c,server.requirepass);
+ matches++;
+ }
+ if (stringmatch(pattern,"masterauth",0)) {
+ addReplyBulkCString(c,"masterauth");
+ addReplyBulkCString(c,server.masterauth);
+ matches++;
+ }
+ if (stringmatch(pattern,"maxmemory",0)) {
+ char buf[128];
+
+ ll2string(buf,128,server.maxmemory);
+ addReplyBulkCString(c,"maxmemory");
+ addReplyBulkCString(c,buf);
+ matches++;
+ }
+ if (stringmatch(pattern,"timeout",0)) {
+ char buf[128];
+
+ ll2string(buf,128,server.maxidletime);
+ addReplyBulkCString(c,"timeout");
+ addReplyBulkCString(c,buf);
+ matches++;
+ }
+ if (stringmatch(pattern,"appendonly",0)) {
+ addReplyBulkCString(c,"appendonly");
+ addReplyBulkCString(c,server.appendonly ? "yes" : "no");
+ matches++;
+ }
+ if (stringmatch(pattern,"no-appendfsync-on-rewrite",0)) {
+ addReplyBulkCString(c,"no-appendfsync-on-rewrite");
+ addReplyBulkCString(c,server.no_appendfsync_on_rewrite ? "yes" : "no");
+ matches++;
+ }
+ if (stringmatch(pattern,"appendfsync",0)) {
+ char *policy;
+
+ switch(server.appendfsync) {
+ case APPENDFSYNC_NO: policy = "no"; break;
+ case APPENDFSYNC_EVERYSEC: policy = "everysec"; break;
+ case APPENDFSYNC_ALWAYS: policy = "always"; break;
+ default: policy = "unknown"; break; /* too harmless to panic */
+ }
+ addReplyBulkCString(c,"appendfsync");
+ addReplyBulkCString(c,policy);
+ matches++;
+ }
+ if (stringmatch(pattern,"save",0)) {
+ sds buf = sdsempty();
+ int j;
+
+ for (j = 0; j < server.saveparamslen; j++) {
+ buf = sdscatprintf(buf,"%ld %d",
+ server.saveparams[j].seconds,
+ server.saveparams[j].changes);
+ if (j != server.saveparamslen-1)
+ buf = sdscatlen(buf," ",1);
+ }
+ addReplyBulkCString(c,"save");
+ addReplyBulkCString(c,buf);
+ sdsfree(buf);
+ matches++;
+ }
+ decrRefCount(o);
+ lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2);
+}
+
+static void configCommand(redisClient *c) {
+ if (!strcasecmp(c->argv[1]->ptr,"set")) {
+ if (c->argc != 4) goto badarity;
+ configSetCommand(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"get")) {
+ if (c->argc != 3) goto badarity;
+ configGetCommand(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"resetstat")) {
+ if (c->argc != 2) goto badarity;
+ server.stat_numcommands = 0;
+ server.stat_numconnections = 0;
+ server.stat_expiredkeys = 0;
+ server.stat_starttime = time(NULL);
+ addReply(c,shared.ok);
+ } else {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR CONFIG subcommand must be one of GET, SET, RESETSTAT\r\n"));
+ }
+ return;
+
+badarity:
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR Wrong number of arguments for CONFIG %s\r\n",
+ (char*) c->argv[1]->ptr));
+}
+
+/* =========================== Pubsub implementation ======================== */
+
+static void freePubsubPattern(void *p) {
+ pubsubPattern *pat = p;
+
+ decrRefCount(pat->pattern);
+ zfree(pat);
+}
+
+static int listMatchPubsubPattern(void *a, void *b) {
+ pubsubPattern *pa = a, *pb = b;
+
+ return (pa->client == pb->client) &&
+ (equalStringObjects(pa->pattern,pb->pattern));
+}
+
+/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was already subscribed to that channel. */
+static int pubsubSubscribeChannel(redisClient *c, robj *channel) {
+ struct dictEntry *de;
+ list *clients = NULL;
+ int retval = 0;
+
+ /* Add the channel to the client -> channels hash table */
+ if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
+ retval = 1;
+ incrRefCount(channel);
+ /* Add the client to the channel -> list of clients hash table */
+ de = dictFind(server.pubsub_channels,channel);
+ if (de == NULL) {
+ clients = listCreate();
+ dictAdd(server.pubsub_channels,channel,clients);
+ incrRefCount(channel);
+ } else {
+ clients = dictGetEntryVal(de);
+ }
+ listAddNodeTail(clients,c);
+ }
+ /* Notify the client */
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.subscribebulk);
+ addReplyBulk(c,channel);
+ addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
+ return retval;
+}
+
+/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was not subscribed to the specified channel. */
+static int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
+ struct dictEntry *de;
+ list *clients;
+ listNode *ln;
+ int retval = 0;
+
+ /* Remove the channel from the client -> channels hash table */
+ incrRefCount(channel); /* channel may be just a pointer to the same object
+ we have in the hash tables. Protect it... */
+ if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
+ retval = 1;
+ /* Remove the client from the channel -> clients list hash table */
+ de = dictFind(server.pubsub_channels,channel);
+ assert(de != NULL);
+ clients = dictGetEntryVal(de);
+ ln = listSearchKey(clients,c);
+ assert(ln != NULL);
+ listDelNode(clients,ln);
+ if (listLength(clients) == 0) {
+ /* Free the list and associated hash entry at all if this was
+ * the latest client, so that it will be possible to abuse
+ * Redis PUBSUB creating millions of channels. */
+ dictDelete(server.pubsub_channels,channel);
+ }
+ }
+ /* Notify the client */
+ if (notify) {
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.unsubscribebulk);
+ addReplyBulk(c,channel);
+ addReplyLongLong(c,dictSize(c->pubsub_channels)+
+ listLength(c->pubsub_patterns));
+
+ }
+ decrRefCount(channel); /* it is finally safe to release it */
+ return retval;
+}
+
+/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
+static int pubsubSubscribePattern(redisClient *c, robj *pattern) {
+ int retval = 0;
+
+ if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
+ retval = 1;
+ pubsubPattern *pat;
+ listAddNodeTail(c->pubsub_patterns,pattern);
+ incrRefCount(pattern);
+ pat = zmalloc(sizeof(*pat));
+ pat->pattern = getDecodedObject(pattern);
+ pat->client = c;
+ listAddNodeTail(server.pubsub_patterns,pat);
+ }
+ /* Notify the client */
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.psubscribebulk);
+ addReplyBulk(c,pattern);
+ addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
+ return retval;
+}
+
+/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was not subscribed to the specified channel. */
+static int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
+ listNode *ln;
+ pubsubPattern pat;
+ int retval = 0;
+
+ incrRefCount(pattern); /* Protect the object. May be the same we remove */
+ if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
+ retval = 1;
+ listDelNode(c->pubsub_patterns,ln);
+ pat.client = c;
+ pat.pattern = pattern;
+ ln = listSearchKey(server.pubsub_patterns,&pat);
+ listDelNode(server.pubsub_patterns,ln);
+ }
+ /* Notify the client */
+ if (notify) {
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.punsubscribebulk);
+ addReplyBulk(c,pattern);
+ addReplyLongLong(c,dictSize(c->pubsub_channels)+
+ listLength(c->pubsub_patterns));
+ }
+ decrRefCount(pattern);
+ return retval;
+}
+
+/* Unsubscribe from all the channels. Return the number of channels the
+ * client was subscribed from. */
+static int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
+ dictIterator *di = dictGetIterator(c->pubsub_channels);
+ dictEntry *de;
+ int count = 0;
+
+ while((de = dictNext(di)) != NULL) {
+ robj *channel = dictGetEntryKey(de);
+
+ count += pubsubUnsubscribeChannel(c,channel,notify);
+ }
+ dictReleaseIterator(di);
+ return count;
+}
+
+/* Unsubscribe from all the patterns. Return the number of patterns the
+ * client was subscribed from. */
+static int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
+ listNode *ln;
+ listIter li;
+ int count = 0;
+
+ listRewind(c->pubsub_patterns,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ robj *pattern = ln->value;
+
+ count += pubsubUnsubscribePattern(c,pattern,notify);
+ }
+ return count;
+}
+
+/* Publish a message */
+static int pubsubPublishMessage(robj *channel, robj *message) {
+ int receivers = 0;
+ struct dictEntry *de;
+ listNode *ln;
+ listIter li;
+
+ /* Send to clients listening for that channel */
+ de = dictFind(server.pubsub_channels,channel);
+ if (de) {
+ list *list = dictGetEntryVal(de);
+ listNode *ln;
+ listIter li;
+
+ listRewind(list,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ redisClient *c = ln->value;
+
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.messagebulk);
+ addReplyBulk(c,channel);
+ addReplyBulk(c,message);
+ receivers++;
+ }
+ }
+ /* Send to clients listening to matching channels */
+ if (listLength(server.pubsub_patterns)) {
+ listRewind(server.pubsub_patterns,&li);
+ channel = getDecodedObject(channel);
+ while ((ln = listNext(&li)) != NULL) {
+ pubsubPattern *pat = ln->value;
+
+ if (stringmatchlen((char*)pat->pattern->ptr,
+ sdslen(pat->pattern->ptr),
+ (char*)channel->ptr,
+ sdslen(channel->ptr),0)) {
+ addReply(pat->client,shared.mbulk4);
+ addReply(pat->client,shared.pmessagebulk);
+ addReplyBulk(pat->client,pat->pattern);
+ addReplyBulk(pat->client,channel);
+ addReplyBulk(pat->client,message);
+ receivers++;
+ }
+ }
+ decrRefCount(channel);
+ }
+ return receivers;
+}
+
+static void subscribeCommand(redisClient *c) {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubSubscribeChannel(c,c->argv[j]);
+}
+
+static void unsubscribeCommand(redisClient *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeAllChannels(c,1);
+ return;
+ } else {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubUnsubscribeChannel(c,c->argv[j],1);
+ }
+}
+
+static void psubscribeCommand(redisClient *c) {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubSubscribePattern(c,c->argv[j]);
+}
+
+static void punsubscribeCommand(redisClient *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeAllPatterns(c,1);
+ return;
+ } else {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubUnsubscribePattern(c,c->argv[j],1);
+ }
+}
+
+static void publishCommand(redisClient *c) {
+ int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
+ addReplyLongLong(c,receivers);
+}
+
+/* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
+ *
+ * The implementation uses a per-DB hash table mapping keys to list of clients
+ * WATCHing those keys, so that given a key that is going to be modified
+ * we can mark all the associated clients as dirty.
+ *
+ * Also every client contains a list of WATCHed keys so that's possible to
+ * un-watch such keys when the client is freed or when UNWATCH is called. */
+
+/* In the client->watched_keys list we need to use watchedKey structures
+ * as in order to identify a key in Redis we need both the key name and the
+ * DB */
+typedef struct watchedKey {
+ robj *key;
+ redisDb *db;
+} watchedKey;
+
+/* Watch for the specified key */
+static void watchForKey(redisClient *c, robj *key) {
+ list *clients = NULL;
+ listIter li;
+ listNode *ln;
+ watchedKey *wk;
+
+ /* Check if we are already watching for this key */
+ listRewind(c->watched_keys,&li);
+ while((ln = listNext(&li))) {
+ wk = listNodeValue(ln);
+ if (wk->db == c->db && equalStringObjects(key,wk->key))
+ return; /* Key already watched */
+ }
+ /* This key is not already watched in this DB. Let's add it */
+ clients = dictFetchValue(c->db->watched_keys,key);
+ if (!clients) {
+ clients = listCreate();
+ dictAdd(c->db->watched_keys,key,clients);
+ incrRefCount(key);
+ }
+ listAddNodeTail(clients,c);
+ /* Add the new key to the lits of keys watched by this client */
+ wk = zmalloc(sizeof(*wk));
+ wk->key = key;
+ wk->db = c->db;
+ incrRefCount(key);
+ listAddNodeTail(c->watched_keys,wk);
+}
+
+/* Unwatch all the keys watched by this client. To clean the EXEC dirty
+ * flag is up to the caller. */
+static void unwatchAllKeys(redisClient *c) {
+ listIter li;
+ listNode *ln;
+
+ if (listLength(c->watched_keys) == 0) return;
+ listRewind(c->watched_keys,&li);
+ while((ln = listNext(&li))) {
+ list *clients;
+ watchedKey *wk;
+
+ /* Lookup the watched key -> clients list and remove the client
+ * from the list */
+ wk = listNodeValue(ln);
+ clients = dictFetchValue(wk->db->watched_keys, wk->key);
+ assert(clients != NULL);
+ listDelNode(clients,listSearchKey(clients,c));
+ /* Kill the entry at all if this was the only client */
+ if (listLength(clients) == 0)
+ dictDelete(wk->db->watched_keys, wk->key);
+ /* Remove this watched key from the client->watched list */
+ listDelNode(c->watched_keys,ln);
+ decrRefCount(wk->key);
+ zfree(wk);
+ }
+}
+
+/* "Touch" a key, so that if this key is being WATCHed by some client the
+ * next EXEC will fail. */
+static void touchWatchedKey(redisDb *db, robj *key) {
+ list *clients;
+ listIter li;
+ listNode *ln;
+
+ if (dictSize(db->watched_keys) == 0) return;
+ clients = dictFetchValue(db->watched_keys, key);
+ if (!clients) return;
+
+ /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
+ /* Check if we are already watching for this key */
+ listRewind(clients,&li);
+ while((ln = listNext(&li))) {
+ redisClient *c = listNodeValue(ln);
+
+ c->flags |= REDIS_DIRTY_CAS;
+ }
+}
+
+/* On FLUSHDB or FLUSHALL all the watched keys that are present before the
+ * flush but will be deleted as effect of the flushing operation should
+ * be touched. "dbid" is the DB that's getting the flush. -1 if it is
+ * a FLUSHALL operation (all the DBs flushed). */
+static void touchWatchedKeysOnFlush(int dbid) {
+ listIter li1, li2;
+ listNode *ln;
+
+ /* For every client, check all the waited keys */
+ listRewind(server.clients,&li1);
+ while((ln = listNext(&li1))) {
+ redisClient *c = listNodeValue(ln);
+ listRewind(c->watched_keys,&li2);
+ while((ln = listNext(&li2))) {
+ watchedKey *wk = listNodeValue(ln);
+
+ /* For every watched key matching the specified DB, if the
+ * key exists, mark the client as dirty, as the key will be
+ * removed. */
+ if (dbid == -1 || wk->db->id == dbid) {
+ if (dictFind(wk->db->dict, wk->key->ptr) != NULL)
+ c->flags |= REDIS_DIRTY_CAS;
+ }
+ }
+ }
+}
+
+static void watchCommand(redisClient *c) {
+ int j;
+
+ if (c->flags & REDIS_MULTI) {
+ addReplySds(c,sdsnew("-ERR WATCH inside MULTI is not allowed\r\n"));
+ return;