+/* =========================== Remote Configuration ========================= */
+
+static void configSetCommand(redisClient *c) {
+ robj *o = getDecodedObject(c->argv[3]);
+ long long ll;
+
+ if (!strcasecmp(c->argv[2]->ptr,"dbfilename")) {
+ zfree(server.dbfilename);
+ server.dbfilename = zstrdup(o->ptr);
+ } else if (!strcasecmp(c->argv[2]->ptr,"requirepass")) {
+ zfree(server.requirepass);
+ server.requirepass = zstrdup(o->ptr);
+ } else if (!strcasecmp(c->argv[2]->ptr,"masterauth")) {
+ zfree(server.masterauth);
+ server.masterauth = zstrdup(o->ptr);
+ } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
+ ll < 0) goto badfmt;
+ server.maxmemory = ll;
+ } else if (!strcasecmp(c->argv[2]->ptr,"timeout")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
+ ll < 0 || ll > LONG_MAX) goto badfmt;
+ server.maxidletime = ll;
+ } else if (!strcasecmp(c->argv[2]->ptr,"appendfsync")) {
+ if (!strcasecmp(o->ptr,"no")) {
+ server.appendfsync = APPENDFSYNC_NO;
+ } else if (!strcasecmp(o->ptr,"everysec")) {
+ server.appendfsync = APPENDFSYNC_EVERYSEC;
+ } else if (!strcasecmp(o->ptr,"always")) {
+ server.appendfsync = APPENDFSYNC_ALWAYS;
+ } else {
+ goto badfmt;
+ }
+ } else if (!strcasecmp(c->argv[2]->ptr,"appendonly")) {
+ int old = server.appendonly;
+ int new = yesnotoi(o->ptr);
+
+ if (new == -1) goto badfmt;
+ if (old != new) {
+ if (new == 0) {
+ stopAppendOnly();
+ } else {
+ if (startAppendOnly() == REDIS_ERR) {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR Unable to turn on AOF. Check server logs.\r\n"));
+ decrRefCount(o);
+ return;
+ }
+ }
+ }
+ } else if (!strcasecmp(c->argv[2]->ptr,"save")) {
+ int vlen, j;
+ sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen);
+
+ /* Perform sanity check before setting the new config:
+ * - Even number of args
+ * - Seconds >= 1, changes >= 0 */
+ if (vlen & 1) {
+ sdsfreesplitres(v,vlen);
+ goto badfmt;
+ }
+ for (j = 0; j < vlen; j++) {
+ char *eptr;
+ long val;
+
+ val = strtoll(v[j], &eptr, 10);
+ if (eptr[0] != '\0' ||
+ ((j & 1) == 0 && val < 1) ||
+ ((j & 1) == 1 && val < 0)) {
+ sdsfreesplitres(v,vlen);
+ goto badfmt;
+ }
+ }
+ /* Finally set the new config */
+ resetServerSaveParams();
+ for (j = 0; j < vlen; j += 2) {
+ time_t seconds;
+ int changes;
+
+ seconds = strtoll(v[j],NULL,10);
+ changes = strtoll(v[j+1],NULL,10);
+ appendServerSaveParams(seconds, changes);
+ }
+ sdsfreesplitres(v,vlen);
+ } else {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR not supported CONFIG parameter %s\r\n",
+ (char*)c->argv[2]->ptr));
+ decrRefCount(o);
+ return;
+ }
+ decrRefCount(o);
+ addReply(c,shared.ok);
+ return;
+
+badfmt: /* Bad format errors */
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR invalid argument '%s' for CONFIG SET '%s'\r\n",
+ (char*)o->ptr,
+ (char*)c->argv[2]->ptr));
+ decrRefCount(o);
+}
+
+static void configGetCommand(redisClient *c) {
+ robj *o = getDecodedObject(c->argv[2]);
+ robj *lenobj = createObject(REDIS_STRING,NULL);
+ char *pattern = o->ptr;
+ int matches = 0;
+
+ addReply(c,lenobj);
+ decrRefCount(lenobj);
+
+ if (stringmatch(pattern,"dbfilename",0)) {
+ addReplyBulkCString(c,"dbfilename");
+ addReplyBulkCString(c,server.dbfilename);
+ matches++;
+ }
+ if (stringmatch(pattern,"requirepass",0)) {
+ addReplyBulkCString(c,"requirepass");
+ addReplyBulkCString(c,server.requirepass);
+ matches++;
+ }
+ if (stringmatch(pattern,"masterauth",0)) {
+ addReplyBulkCString(c,"masterauth");
+ addReplyBulkCString(c,server.masterauth);
+ matches++;
+ }
+ if (stringmatch(pattern,"maxmemory",0)) {
+ char buf[128];
+
+ ll2string(buf,128,server.maxmemory);
+ addReplyBulkCString(c,"maxmemory");
+ addReplyBulkCString(c,buf);
+ matches++;
+ }
+ if (stringmatch(pattern,"timeout",0)) {
+ char buf[128];
+
+ ll2string(buf,128,server.maxidletime);
+ addReplyBulkCString(c,"timeout");
+ addReplyBulkCString(c,buf);
+ matches++;
+ }
+ if (stringmatch(pattern,"appendonly",0)) {
+ addReplyBulkCString(c,"appendonly");
+ addReplyBulkCString(c,server.appendonly ? "yes" : "no");
+ matches++;
+ }
+ if (stringmatch(pattern,"appendfsync",0)) {
+ char *policy;
+
+ switch(server.appendfsync) {
+ case APPENDFSYNC_NO: policy = "no"; break;
+ case APPENDFSYNC_EVERYSEC: policy = "everysec"; break;
+ case APPENDFSYNC_ALWAYS: policy = "always"; break;
+ default: policy = "unknown"; break; /* too harmless to panic */
+ }
+ addReplyBulkCString(c,"appendfsync");
+ addReplyBulkCString(c,policy);
+ matches++;
+ }
+ if (stringmatch(pattern,"save",0)) {
+ sds buf = sdsempty();
+ int j;
+
+ for (j = 0; j < server.saveparamslen; j++) {
+ buf = sdscatprintf(buf,"%ld %d",
+ server.saveparams[j].seconds,
+ server.saveparams[j].changes);
+ if (j != server.saveparamslen-1)
+ buf = sdscatlen(buf," ",1);
+ }
+ addReplyBulkCString(c,"save");
+ addReplyBulkCString(c,buf);
+ sdsfree(buf);
+ matches++;
+ }
+ decrRefCount(o);
+ lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2);
+}
+
+static void configCommand(redisClient *c) {
+ if (!strcasecmp(c->argv[1]->ptr,"set")) {
+ if (c->argc != 4) goto badarity;
+ configSetCommand(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"get")) {
+ if (c->argc != 3) goto badarity;
+ configGetCommand(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"resetstat")) {
+ if (c->argc != 2) goto badarity;
+ server.stat_numcommands = 0;
+ server.stat_numconnections = 0;
+ server.stat_expiredkeys = 0;
+ server.stat_starttime = time(NULL);
+ addReply(c,shared.ok);
+ } else {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR CONFIG subcommand must be one of GET, SET, RESETSTAT\r\n"));
+ }
+ return;
+
+badarity:
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-ERR Wrong number of arguments for CONFIG %s\r\n",
+ (char*) c->argv[1]->ptr));
+}
+
+/* =========================== Pubsub implementation ======================== */
+
+static void freePubsubPattern(void *p) {
+ pubsubPattern *pat = p;
+
+ decrRefCount(pat->pattern);
+ zfree(pat);
+}
+
+static int listMatchPubsubPattern(void *a, void *b) {
+ pubsubPattern *pa = a, *pb = b;
+
+ return (pa->client == pb->client) &&
+ (equalStringObjects(pa->pattern,pb->pattern));
+}
+
+/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was already subscribed to that channel. */
+static int pubsubSubscribeChannel(redisClient *c, robj *channel) {
+ struct dictEntry *de;
+ list *clients = NULL;
+ int retval = 0;
+
+ /* Add the channel to the client -> channels hash table */
+ if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
+ retval = 1;
+ incrRefCount(channel);
+ /* Add the client to the channel -> list of clients hash table */
+ de = dictFind(server.pubsub_channels,channel);
+ if (de == NULL) {
+ clients = listCreate();
+ dictAdd(server.pubsub_channels,channel,clients);
+ incrRefCount(channel);
+ } else {
+ clients = dictGetEntryVal(de);
+ }
+ listAddNodeTail(clients,c);
+ }
+ /* Notify the client */
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.subscribebulk);
+ addReplyBulk(c,channel);
+ addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
+ return retval;
+}
+
+/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was not subscribed to the specified channel. */
+static int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
+ struct dictEntry *de;
+ list *clients;
+ listNode *ln;
+ int retval = 0;
+
+ /* Remove the channel from the client -> channels hash table */
+ incrRefCount(channel); /* channel may be just a pointer to the same object
+ we have in the hash tables. Protect it... */
+ if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
+ retval = 1;
+ /* Remove the client from the channel -> clients list hash table */
+ de = dictFind(server.pubsub_channels,channel);
+ assert(de != NULL);
+ clients = dictGetEntryVal(de);
+ ln = listSearchKey(clients,c);
+ assert(ln != NULL);
+ listDelNode(clients,ln);
+ if (listLength(clients) == 0) {
+ /* Free the list and associated hash entry at all if this was
+ * the latest client, so that it will be possible to abuse
+ * Redis PUBSUB creating millions of channels. */
+ dictDelete(server.pubsub_channels,channel);
+ }
+ }
+ /* Notify the client */
+ if (notify) {
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.unsubscribebulk);
+ addReplyBulk(c,channel);
+ addReplyLongLong(c,dictSize(c->pubsub_channels)+
+ listLength(c->pubsub_patterns));
+
+ }
+ decrRefCount(channel); /* it is finally safe to release it */
+ return retval;
+}
+
+/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
+static int pubsubSubscribePattern(redisClient *c, robj *pattern) {
+ int retval = 0;
+
+ if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
+ retval = 1;
+ pubsubPattern *pat;
+ listAddNodeTail(c->pubsub_patterns,pattern);
+ incrRefCount(pattern);
+ pat = zmalloc(sizeof(*pat));
+ pat->pattern = getDecodedObject(pattern);
+ pat->client = c;
+ listAddNodeTail(server.pubsub_patterns,pat);
+ }
+ /* Notify the client */
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.psubscribebulk);
+ addReplyBulk(c,pattern);
+ addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
+ return retval;
+}
+
+/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was not subscribed to the specified channel. */
+static int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
+ listNode *ln;
+ pubsubPattern pat;
+ int retval = 0;
+
+ incrRefCount(pattern); /* Protect the object. May be the same we remove */
+ if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
+ retval = 1;
+ listDelNode(c->pubsub_patterns,ln);
+ pat.client = c;
+ pat.pattern = pattern;
+ ln = listSearchKey(server.pubsub_patterns,&pat);
+ listDelNode(server.pubsub_patterns,ln);
+ }
+ /* Notify the client */
+ if (notify) {
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.punsubscribebulk);
+ addReplyBulk(c,pattern);
+ addReplyLongLong(c,dictSize(c->pubsub_channels)+
+ listLength(c->pubsub_patterns));
+ }
+ decrRefCount(pattern);
+ return retval;
+}
+
+/* Unsubscribe from all the channels. Return the number of channels the
+ * client was subscribed from. */
+static int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
+ dictIterator *di = dictGetIterator(c->pubsub_channels);
+ dictEntry *de;
+ int count = 0;
+
+ while((de = dictNext(di)) != NULL) {
+ robj *channel = dictGetEntryKey(de);
+
+ count += pubsubUnsubscribeChannel(c,channel,notify);
+ }
+ dictReleaseIterator(di);
+ return count;
+}
+
+/* Unsubscribe from all the patterns. Return the number of patterns the
+ * client was subscribed from. */
+static int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
+ listNode *ln;
+ listIter li;
+ int count = 0;
+
+ listRewind(c->pubsub_patterns,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ robj *pattern = ln->value;
+
+ count += pubsubUnsubscribePattern(c,pattern,notify);
+ }
+ return count;
+}
+
+/* Publish a message */
+static int pubsubPublishMessage(robj *channel, robj *message) {
+ int receivers = 0;
+ struct dictEntry *de;
+ listNode *ln;
+ listIter li;
+
+ /* Send to clients listening for that channel */
+ de = dictFind(server.pubsub_channels,channel);
+ if (de) {
+ list *list = dictGetEntryVal(de);
+ listNode *ln;
+ listIter li;
+
+ listRewind(list,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ redisClient *c = ln->value;
+
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.messagebulk);
+ addReplyBulk(c,channel);
+ addReplyBulk(c,message);
+ receivers++;
+ }
+ }
+ /* Send to clients listening to matching channels */
+ if (listLength(server.pubsub_patterns)) {
+ listRewind(server.pubsub_patterns,&li);
+ channel = getDecodedObject(channel);
+ while ((ln = listNext(&li)) != NULL) {
+ pubsubPattern *pat = ln->value;
+
+ if (stringmatchlen((char*)pat->pattern->ptr,
+ sdslen(pat->pattern->ptr),
+ (char*)channel->ptr,
+ sdslen(channel->ptr),0)) {
+ addReply(pat->client,shared.mbulk4);
+ addReply(pat->client,shared.pmessagebulk);
+ addReplyBulk(pat->client,pat->pattern);
+ addReplyBulk(pat->client,channel);
+ addReplyBulk(pat->client,message);
+ receivers++;
+ }
+ }
+ decrRefCount(channel);
+ }
+ return receivers;
+}
+
+static void subscribeCommand(redisClient *c) {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubSubscribeChannel(c,c->argv[j]);
+}
+
+static void unsubscribeCommand(redisClient *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeAllChannels(c,1);
+ return;
+ } else {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubUnsubscribeChannel(c,c->argv[j],1);
+ }
+}
+
+static void psubscribeCommand(redisClient *c) {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubSubscribePattern(c,c->argv[j]);
+}
+
+static void punsubscribeCommand(redisClient *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeAllPatterns(c,1);
+ return;
+ } else {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubUnsubscribePattern(c,c->argv[j],1);
+ }
+}
+
+static void publishCommand(redisClient *c) {
+ int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
+ addReplyLongLong(c,receivers);
+}
+