config file and the server is using more than maxmemory bytes of memory.
In short this commands are denied on low memory conditions. */
#define REDIS_CMD_DENYOOM 4
+#define REDIS_CMD_FORCE_REPLICATION 8 /* Force replication even if dirty is 0 */
/* Object types */
#define REDIS_STRING 0
* is >= blockingto then the operation timed out. */
list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */
- dict *pubsub_classes; /* Classes a client is interested in (SUBSCRIBE) */
+ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
+ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
} redisClient;
struct saveparam {
int port;
int fd;
redisDb *db;
- dict *sharingpool; /* Poll used for object sharing */
- unsigned int sharingpoolsize;
long long dirty; /* changes to DB from the last save */
list *clients;
list *slaves, *monitors;
unsigned long long vm_stats_swapouts;
unsigned long long vm_stats_swapins;
/* Pubsub */
- dict *pubsub_classes; /* Associate classes to list of subscribed clients */
+ dict *pubsub_channels; /* Map channels to list of subscribed clients */
+ list *pubsub_patterns; /* A list of pubsub_patterns */
/* Misc */
FILE *devnull;
};
+typedef struct pubsubPattern {
+ redisClient *client;
+ robj *pattern;
+} pubsubPattern;
+
typedef void redisCommandProc(redisClient *c);
struct redisCommand {
char *name;
/* Our shared "common" objects */
+#define REDIS_SHARED_INTEGERS 10000
struct sharedObjectsStruct {
robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
*colon, *nullbulk, *nullmultibulk, *queued,
*outofrangeerr, *plus,
*select0, *select1, *select2, *select3, *select4,
*select5, *select6, *select7, *select8, *select9,
- *messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3;
+ *messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3,
+ *psubscribebulk, *punsubscribebulk, *integers[REDIS_SHARED_INTEGERS];
} shared;
/* Global vars that are actally used as constants. The following double
static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
static int syncWithMaster(void);
-static robj *tryObjectSharing(robj *o);
-static int tryObjectEncoding(robj *o);
+static robj *tryObjectEncoding(robj *o);
static robj *getDecodedObject(robj *o);
static int removeExpire(redisDb *db, robj *key);
static int expireIfNeeded(redisDb *db, robj *key);
static void call(redisClient *c, struct redisCommand *cmd);
static void resetClient(redisClient *c);
static void convertToRealHash(robj *o);
-static int pubsubUnsubscribeAll(redisClient *c, int notify);
+static int pubsubUnsubscribeAllChannels(redisClient *c, int notify);
+static int pubsubUnsubscribeAllPatterns(redisClient *c, int notify);
+static void freePubsubPattern(void *p);
+static int listMatchPubsubPattern(void *a, void *b);
+static int compareStringObjects(robj *a, robj *b);
static void usage();
static void authCommand(redisClient *c);
static void zrankCommand(redisClient *c);
static void zrevrankCommand(redisClient *c);
static void hsetCommand(redisClient *c);
+static void hmsetCommand(redisClient *c);
static void hgetCommand(redisClient *c);
static void hdelCommand(redisClient *c);
static void hlenCommand(redisClient *c);
static void hincrbyCommand(redisClient *c);
static void subscribeCommand(redisClient *c);
static void unsubscribeCommand(redisClient *c);
+static void psubscribeCommand(redisClient *c);
+static void punsubscribeCommand(redisClient *c);
static void publishCommand(redisClient *c);
/*================================= Globals ================================= */
{"zrank",zrankCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
{"zrevrank",zrevrankCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
{"hset",hsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1},
+ {"hmset",hmsetCommand,-4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1},
{"hincrby",hincrbyCommand,4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1},
{"hget",hgetCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
{"hdel",hdelCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
{"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{"type",typeCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
{"multi",multiCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
- {"exec",execCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
+ {"exec",execCommand,1,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,0,0,0},
{"discard",discardCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{"sync",syncCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{"flushdb",flushdbCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
{"subscribe",subscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0},
- {"publish",publishCommand,3,REDIS_CMD_BULK,NULL,0,0,0},
+ {"psubscribe",psubscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
+ {"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0},
+ {"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0},
{NULL,NULL,0,0,NULL,0,0,0}
};
if (server.maxidletime &&
!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
!(c->flags & REDIS_MASTER) && /* no timeout for masters */
- (now - c->lastinteraction > server.maxidletime))
+ dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
+ listLength(c->pubsub_patterns) == 0 &&
+ (now - c->lastinteraction > server.maxidletime))
{
redisLog(REDIS_VERBOSE,"Closing idle client");
freeClient(c);
redisLog(REDIS_WARNING, "Background saving error");
} else {
redisLog(REDIS_WARNING,
- "Background saving terminated by signal");
+ "Background saving terminated by signal %d", WTERMSIG(statloc));
rdbRemoveTempFile(server.bgsavechildpid);
}
server.bgsavechildpid = -1;
redisLog(REDIS_WARNING, "Background append only file rewriting error");
} else {
redisLog(REDIS_WARNING,
- "Background append only file rewriting terminated by signal");
+ "Background append only file rewriting terminated by signal %d",
+ WTERMSIG(statloc));
}
cleanup:
sdsfree(server.bgrewritebuf);
server.bgrewritechildpid = -1;
}
+/* This function is called once a background process of some kind terminates,
+ * as we want to avoid resizing the hash tables when there is a child in order
+ * to play well with copy-on-write (otherwise when a resize happens lots of
+ * memory pages are copied). The goal of this function is to update the ability
+ * for dict.c to resize the hash tables accordingly to the fact we have o not
+ * running childs. */
+static void updateDictResizePolicy(void) {
+ if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
+ dictEnableResize();
+ else
+ dictDisableResize();
+}
+
static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j, loops = server.cronloops++;
REDIS_NOTUSED(eventLoop);
* if we resize the HT while there is the saving child at work actually
* a lot of memory movements in the parent will cause a lot of pages
* copied. */
- if (server.bgsavechildpid == -1 && !(loops % 10)) tryResizeHashTables();
+ if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1 &&
+ !(loops % 10))
+ {
+ tryResizeHashTables();
+ }
/* Show information about connected clients */
if (!(loops % 50)) {
- redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
+ redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
- zmalloc_used_memory(),
- dictSize(server.sharingpool));
+ zmalloc_used_memory());
}
/* Close connections of timedout clients */
} else {
backgroundRewriteDoneHandler(statloc);
}
+ updateDictResizePolicy();
}
} else {
/* If there is not a background saving in progress check if
}
static void createSharedObjects(void) {
+ int j;
+
shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
+ shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
+ shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
shared.mbulk3 = createStringObject("*3\r\n",4);
+ for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
+ shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
+ shared.integers[j]->encoding = REDIS_ENCODING_INT;
+ }
}
static void appendServerSaveParams(time_t seconds, int changes) {
server.requirepass = NULL;
server.shareobjects = 0;
server.rdbcompression = 1;
- server.sharingpoolsize = 1024;
server.maxclients = 0;
server.blpop_blocked_clients = 0;
server.maxmemory = 0;
createSharedObjects();
server.el = aeCreateEventLoop();
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
- server.sharingpool = dictCreate(&setDictType,NULL);
server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
if (server.fd == -1) {
redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
}
- server.pubsub_classes = dictCreate(&keylistDictType,NULL);
+ server.pubsub_channels = dictCreate(&keylistDictType,NULL);
+ server.pubsub_patterns = listCreate();
+ listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
+ listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
server.cronloops = 0;
server.bgsavechildpid = -1;
server.bgrewritechildpid = -1;
if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
- } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
- server.sharingpoolsize = atoi(argv[1]);
- if (server.sharingpoolsize < 1) {
- err = "invalid object sharing pool size"; goto loaderr;
- }
} else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
if ((server.daemonize = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
if (c->flags & REDIS_BLOCKED)
unblockClientWaitingData(c);
- /* Unsubscribe from all the pubsub classes */
- pubsubUnsubscribeAll(c,0);
- dictRelease(c->pubsub_classes);
+ /* Unsubscribe from all the pubsub channels */
+ pubsubUnsubscribeAllChannels(c,0);
+ pubsubUnsubscribeAllPatterns(c,0);
+ dictRelease(c->pubsub_channels);
+ listRelease(c->pubsub_patterns);
/* Obvious cleanup */
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
dirty = server.dirty;
cmd->proc(c);
- if (server.appendonly && server.dirty-dirty)
+ dirty = server.dirty-dirty;
+
+ if (server.appendonly && dirty)
feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
- if (server.dirty-dirty && listLength(server.slaves))
+ if ((dirty || cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
+ listLength(server.slaves))
replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
if (listLength(server.monitors))
replicationFeedSlaves(server.monitors,c->db->id,c->argv,c->argc);
return 1;
}
}
- /* Let's try to share objects on the command arguments vector */
- if (server.shareobjects) {
- int j;
- for(j = 1; j < c->argc; j++)
- c->argv[j] = tryObjectSharing(c->argv[j]);
- }
/* Let's try to encode the bulk object to save space. */
if (cmd->flags & REDIS_CMD_BULK)
- tryObjectEncoding(c->argv[c->argc-1]);
+ c->argv[c->argc-1] = tryObjectEncoding(c->argv[c->argc-1]);
/* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
return 1;
}
+ /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
+ if (dictSize(c->pubsub_channels) > 0 &&
+ cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand &&
+ cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) {
+ addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n"));
+ resetClient(c);
+ return 1;
+ }
+
/* Exec the command */
if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) {
queueMultiCommand(c,cmd);
return o;
}
+static int listMatchObjects(void *a, void *b) {
+ return compareStringObjects(a,b) == 0;
+}
+
static redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(*c));
c->blockingkeys = NULL;
c->blockingkeysnum = 0;
c->io_keys = listCreate();
- c->pubsub_classes = dictCreate(&setDictType,NULL);
listSetFreeMethod(c->io_keys,decrRefCount);
+ c->pubsub_channels = dictCreate(&setDictType,NULL);
+ c->pubsub_patterns = listCreate();
+ listSetFreeMethod(c->pubsub_patterns,decrRefCount);
+ listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c) == AE_ERR) {
freeClient(c);
addReplySds(c,sdsnewlen(buf,len));
}
+static void addReplyLongLong(redisClient *c, long long ll) {
+ char buf[128];
+ size_t len;
+
+ if (ll == 0) {
+ addReply(c,shared.czero);
+ return;
+ } else if (ll == 1) {
+ addReply(c,shared.cone);
+ return;
+ }
+ len = snprintf(buf,sizeof(buf),":%lld\r\n",ll);
+ addReplySds(c,sdsnewlen(buf,len));
+}
+
static void addReplyUlong(redisClient *c, unsigned long ul) {
char buf[128];
size_t len;
}
static void incrRefCount(robj *o) {
- redisAssert(!server.vm_enabled || o->storage == REDIS_VM_MEMORY);
o->refcount++;
}
if (server.vm_enabled &&
(o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING))
{
- if (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING) {
- redisAssert(o->refcount == 1);
- }
if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(obj);
redisAssert(o->type == REDIS_STRING);
freeStringObject(o);
return retval == DICT_OK;
}
-/* Try to share an object against the shared objects pool */
-static robj *tryObjectSharing(robj *o) {
- struct dictEntry *de;
- unsigned long c;
-
- if (o == NULL || server.shareobjects == 0) return o;
-
- redisAssert(o->type == REDIS_STRING);
- de = dictFind(server.sharingpool,o);
- if (de) {
- robj *shared = dictGetEntryKey(de);
-
- c = ((unsigned long) dictGetEntryVal(de))+1;
- dictGetEntryVal(de) = (void*) c;
- incrRefCount(shared);
- decrRefCount(o);
- return shared;
- } else {
- /* Here we are using a stream algorihtm: Every time an object is
- * shared we increment its count, everytime there is a miss we
- * recrement the counter of a random object. If this object reaches
- * zero we remove the object and put the current object instead. */
- if (dictSize(server.sharingpool) >=
- server.sharingpoolsize) {
- de = dictGetRandomKey(server.sharingpool);
- redisAssert(de != NULL);
- c = ((unsigned long) dictGetEntryVal(de))-1;
- dictGetEntryVal(de) = (void*) c;
- if (c == 0) {
- dictDelete(server.sharingpool,de->key);
- }
- } else {
- c = 0; /* If the pool is empty we want to add this object */
- }
- if (c == 0) {
- int retval;
-
- retval = dictAdd(server.sharingpool,o,(void*)1);
- redisAssert(retval == DICT_OK);
- incrRefCount(o);
- }
- return o;
- }
-}
-
/* Check if the nul-terminated string 's' can be represented by a long
* (that is, is a number that fits into long without any other space or
* character before or after the digits).
}
/* Try to encode a string object in order to save space */
-static int tryObjectEncoding(robj *o) {
+static robj *tryObjectEncoding(robj *o) {
long value;
sds s = o->ptr;
if (o->encoding != REDIS_ENCODING_RAW)
- return REDIS_ERR; /* Already encoded */
+ return o; /* Already encoded */
- /* It's not save to encode shared objects: shared objects can be shared
+ /* It's not safe to encode shared objects: shared objects can be shared
* everywhere in the "object space" of Redis. Encoded objects can only
* appear as "values" (and not, for instance, as keys) */
- if (o->refcount > 1) return REDIS_ERR;
+ if (o->refcount > 1) return o;
/* Currently we try to encode only strings */
redisAssert(o->type == REDIS_STRING);
/* Check if we can represent this string as a long integer */
- if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
+ if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return o;
/* Ok, this object can be encoded */
- o->encoding = REDIS_ENCODING_INT;
- sdsfree(o->ptr);
- o->ptr = (void*) value;
- return REDIS_OK;
+ if (value >= 0 && value < REDIS_SHARED_INTEGERS) {
+ decrRefCount(o);
+ incrRefCount(shared.integers[value]);
+ return shared.integers[value];
+ } else {
+ o->encoding = REDIS_ENCODING_INT;
+ sdsfree(o->ptr);
+ o->ptr = (void*) value;
+ return o;
+ }
}
/* Get a decoded version of an encoded object (returned as a new object).
}
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
server.bgsavechildpid = childpid;
+ updateDictResizePolicy();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
case REDIS_RDB_ENC_INT8:
case REDIS_RDB_ENC_INT16:
case REDIS_RDB_ENC_INT32:
- return tryObjectSharing(rdbLoadIntegerObject(fp,len));
+ return rdbLoadIntegerObject(fp,len);
case REDIS_RDB_ENC_LZF:
- return tryObjectSharing(rdbLoadLzfStringObject(fp));
+ return rdbLoadLzfStringObject(fp);
default:
redisAssert(0);
}
sdsfree(val);
return NULL;
}
- return tryObjectSharing(createObject(REDIS_STRING,val));
+ return createObject(REDIS_STRING,val);
}
/* For information about double serialization check rdbSaveDoubleValue() */
if (type == REDIS_STRING) {
/* Read string value */
if ((o = rdbLoadStringObject(fp)) == NULL) return NULL;
- tryObjectEncoding(o);
+ o = tryObjectEncoding(o);
} else if (type == REDIS_LIST || type == REDIS_SET) {
/* Read list/set value */
uint32_t listlen;
robj *ele;
if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
- tryObjectEncoding(ele);
+ ele = tryObjectEncoding(ele);
if (type == REDIS_LIST) {
listAddNodeTail((list*)o->ptr,ele);
} else {
double *score = zmalloc(sizeof(double));
if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
- tryObjectEncoding(ele);
+ ele = tryObjectEncoding(ele);
if (rdbLoadDoubleValue(fp,score) == -1) return NULL;
dictAdd(zs->dict,ele,score);
zslInsert(zs->zsl,*score,ele);
decrRefCount(key);
decrRefCount(val);
} else {
- tryObjectEncoding(key);
- tryObjectEncoding(val);
+ key = tryObjectEncoding(key);
+ val = tryObjectEncoding(val);
dictAdd((dict*)o->ptr,key,val);
}
}
for (j = 1; j < c->argc; j += 2) {
int retval;
- tryObjectEncoding(c->argv[j+1]);
+ c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
if (retval == DICT_ERR) {
dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
value += incr;
o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
- tryObjectEncoding(o);
+ o = tryObjectEncoding(o);
retval = dictAdd(c->db->dict,c->argv[1],o);
if (retval == DICT_ERR) {
dictReplace(c->db->dict,c->argv[1],o);
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
- return level;
+ return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
static void zslInsert(zskiplist *zsl, double score, robj *obj) {
addReplyLong(c, dstzset->zsl->length);
server.dirty++;
} else {
- decrRefCount(dstzset);
+ decrRefCount(dstobj);
addReply(c, shared.czero);
}
zfree(src);
decrRefCount(valobj);
o->ptr = zm;
- /* And here there is the second check for hash conversion...
- * we want to do it only if the operation was not just an update as
- * zipmapLen() is O(N). */
- if (!update && zipmapLen(zm) > server.hash_max_zipmap_entries)
+ /* And here there is the second check for hash conversion. */
+ if (zipmapLen(zm) > server.hash_max_zipmap_entries)
convertToRealHash(o);
} else {
- tryObjectEncoding(c->argv[2]);
+ c->argv[2] = tryObjectEncoding(c->argv[2]);
/* note that c->argv[3] is already encoded, as the latest arg
* of a bulk command is always integer encoded if possible. */
if (dictReplace(o->ptr,c->argv[2],c->argv[3])) {
addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",update == 0));
}
+static void hmsetCommand(redisClient *c) {
+ int i;
+ robj *o, *key, *val;
+
+ if ((c->argc % 2) == 1) {
+ addReplySds(c,sdsnew("-ERR wrong number of arguments for HMSET\r\n"));
+ return;
+ }
+
+ if ((o = lookupKeyWrite(c->db,c->argv[1])) == NULL) {
+ o = createHashObject();
+ dictAdd(c->db->dict,c->argv[1],o);
+ incrRefCount(c->argv[1]);
+ } else {
+ if (o->type != REDIS_HASH) {
+ addReply(c,shared.wrongtypeerr);
+ return;
+ }
+ }
+
+ /* We want to convert the zipmap into an hash table right now if the
+ * entry to be added is too big. */
+ if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+ for (i = 2; i < c->argc; i+=2) {
+ if ((c->argv[i]->encoding == REDIS_ENCODING_RAW &&
+ sdslen(c->argv[i]->ptr) > server.hash_max_zipmap_value) ||
+ (c->argv[i+1]->encoding == REDIS_ENCODING_RAW &&
+ sdslen(c->argv[i+1]->ptr) > server.hash_max_zipmap_value)) {
+ convertToRealHash(o);
+ break;
+ }
+ }
+ }
+
+ if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+ unsigned char *zm = o->ptr;
+
+ for (i = 2; i < c->argc; i+=2) {
+ key = getDecodedObject(c->argv[i]);
+ val = getDecodedObject(c->argv[i+1]);
+ zm = zipmapSet(zm,key->ptr,sdslen(key->ptr),
+ val->ptr,sdslen(val->ptr),NULL);
+ decrRefCount(key);
+ decrRefCount(val);
+ o->ptr = zm;
+ }
+
+ /* And here there is the second check for hash conversion. */
+ if (zipmapLen(zm) > server.hash_max_zipmap_entries)
+ convertToRealHash(o);
+ } else {
+ for (i = 2; i < c->argc; i+=2) {
+ key = tryObjectEncoding(c->argv[i]);
+ val = tryObjectEncoding(c->argv[i+1]);
+ if (dictReplace(o->ptr,key,val)) {
+ incrRefCount(key);
+ }
+ incrRefCount(val);
+ }
+ }
+
+ addReply(c, shared.ok);
+}
+
static void hincrbyCommand(redisClient *c) {
- int update = 0;
long long value = 0, incr = 0;
robj *o = lookupKeyWrite(c->db,c->argv[1]);
}
}
- robj *o_incr = getDecodedObject(c->argv[3]);
- incr = strtoll(o_incr->ptr, NULL, 10);
- decrRefCount(o_incr);
-
+ incr = strtoll(c->argv[3]->ptr, NULL, 10);
if (o->encoding == REDIS_ENCODING_ZIPMAP) {
unsigned char *zm = o->ptr;
unsigned char *zval;
value += incr;
sds svalue = sdscatprintf(sdsempty(),"%lld",value);
zm = zipmapSet(zm,c->argv[2]->ptr,sdslen(c->argv[2]->ptr),
- (unsigned char*)svalue,sdslen(svalue),&update);
+ (unsigned char*)svalue,sdslen(svalue),NULL);
sdsfree(svalue);
o->ptr = zm;
- /* Check if the zipmap needs to be converted
- * if this was not an update. */
- if (!update && zipmapLen(zm) > server.hash_max_zipmap_entries)
+ /* Check if the zipmap needs to be converted. */
+ if (zipmapLen(zm) > server.hash_max_zipmap_entries)
convertToRealHash(o);
} else {
robj *hval;
value += incr;
hval = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
- tryObjectEncoding(hval);
+ hval = tryObjectEncoding(hval);
if (dictReplace(o->ptr,c->argv[2],hval)) {
incrRefCount(c->argv[2]);
}
}
server.dirty++;
- addReplyLong(c, value);
+ addReplyLongLong(c, value);
}
static void hgetCommand(redisClient *c) {
keyobj = createStringObject((char*)key,klen);
valobj = createStringObject((char*)val,vlen);
- tryObjectEncoding(keyobj);
- tryObjectEncoding(valobj);
+ keyobj = tryObjectEncoding(keyobj);
+ valobj = tryObjectEncoding(valobj);
dictAdd(dict,keyobj,valobj);
}
o->encoding = REDIS_ENCODING_HT;
"expired_keys:%lld\r\n"
"hash_max_zipmap_entries:%ld\r\n"
"hash_max_zipmap_value:%ld\r\n"
- "pubsub_classes:%ld\r\n"
+ "pubsub_channels:%ld\r\n"
+ "pubsub_patterns:%u\r\n"
"vm_enabled:%d\r\n"
"role:%s\r\n"
,REDIS_VERSION,
server.stat_expiredkeys,
server.hash_max_zipmap_entries,
server.hash_max_zipmap_value,
- dictSize(server.pubsub_classes),
+ dictSize(server.pubsub_channels),
+ listLength(server.pubsub_patterns),
server.vm_enabled != 0,
server.masterhost == NULL ? "master" : "slave"
);
redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
exit(1);
}
- /* Try object sharing and encoding */
- if (server.shareobjects) {
- int j;
- for(j = 1; j < argc; j++)
- argv[j] = tryObjectSharing(argv[j]);
- }
+ /* Try object encoding */
if (cmd->flags & REDIS_CMD_BULK)
- tryObjectEncoding(argv[argc-1]);
+ argv[argc-1] = tryObjectEncoding(argv[argc-1]);
/* Run the command in the context of a fake client */
fakeClient->argc = argc;
fakeClient->argv = argv;
redisLog(REDIS_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
server.bgrewritechildpid = childpid;
+ updateDictResizePolicy();
/* We set appendseldb to -1 in order to force the next call to the
* feedAppendOnlyFile() to issue a SELECT command, so the differences
* accumulated by the parent into server.bgrewritebuf will start
j->type == REDIS_IOJOB_DO_SWAP ||
j->type == REDIS_IOJOB_LOAD) && j->val != NULL)
decrRefCount(j->val);
- decrRefCount(j->key);
+ /* We don't decrRefCount the j->key field as we did't incremented
+ * the count creating IO Jobs. This is because the key field here is
+ * just used as an indentifier and if a key is removed the Job should
+ * never be touched again. */
zfree(j);
}
iojob *job = ln->value;
if (job->canceled) continue; /* Skip this, already canceled. */
- if (compareStringObjects(job->key,o) == 0) {
+ if (job->key == o) {
redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n",
(void*)job, (char*)o->ptr, job->type, i);
/* Mark the pages as free since the swap didn't happened
j = zmalloc(sizeof(*j));
j->type = REDIS_IOJOB_PREPARE_SWAP;
j->db = db;
- j->key = dupStringObject(key);
+ j->key = key;
j->val = val;
incrRefCount(val);
j->canceled = 0;
j = zmalloc(sizeof(*j));
j->type = REDIS_IOJOB_LOAD;
j->db = c->db;
- j->key = dupStringObject(key);
+ j->key = o;
j->key->vtype = o->vtype;
j->page = o->vm.page;
j->val = NULL;
/* =========================== Pubsub implementation ======================== */
-/* Subscribe a client to a class. Returns 1 if the operation succeeded, or
- * 0 if the client was already subscribed to that class. */
-static int pubsubSubscribe(redisClient *c, robj *class) {
+static void freePubsubPattern(void *p) {
+ pubsubPattern *pat = p;
+
+ decrRefCount(pat->pattern);
+ zfree(pat);
+}
+
+static int listMatchPubsubPattern(void *a, void *b) {
+ pubsubPattern *pa = a, *pb = b;
+
+ return (pa->client == pb->client) &&
+ (compareStringObjects(pa->pattern,pb->pattern) == 0);
+}
+
+/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was already subscribed to that channel. */
+static int pubsubSubscribeChannel(redisClient *c, robj *channel) {
struct dictEntry *de;
list *clients = NULL;
int retval = 0;
- /* Add the class to the client -> classes hash table */
- if (dictAdd(c->pubsub_classes,class,NULL) == DICT_OK) {
+ /* Add the channel to the client -> channels hash table */
+ if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
- incrRefCount(class);
- /* Add the client to the class -> list of clients hash table */
- de = dictFind(server.pubsub_classes,class);
+ incrRefCount(channel);
+ /* Add the client to the channel -> list of clients hash table */
+ de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
- dictAdd(server.pubsub_classes,class,clients);
- incrRefCount(class);
+ dictAdd(server.pubsub_channels,channel,clients);
+ incrRefCount(channel);
} else {
clients = dictGetEntryVal(de);
}
/* Notify the client */
addReply(c,shared.mbulk3);
addReply(c,shared.subscribebulk);
- addReplyBulk(c,class);
- addReplyLong(c,dictSize(c->pubsub_classes));
+ addReplyBulk(c,channel);
+ addReplyLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
return retval;
}
-/* Unsubscribe a client from a class. Returns 1 if the operation succeeded, or
- * 0 if the client was not subscribed to the specified class. */
-static int pubsubUnsubscribe(redisClient *c, robj *class, int notify) {
+/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was not subscribed to the specified channel. */
+static int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
struct dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
- /* Remove the class from the client -> classes hash table */
- if (dictDelete(c->pubsub_classes,class) == DICT_OK) {
+ /* Remove the channel from the client -> channels hash table */
+ incrRefCount(channel); /* channel may be just a pointer to the same object
+ we have in the hash tables. Protect it... */
+ if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
retval = 1;
- /* Remove the client from the class -> clients list hash table */
- de = dictFind(server.pubsub_classes,class);
+ /* Remove the client from the channel -> clients list hash table */
+ de = dictFind(server.pubsub_channels,channel);
assert(de != NULL);
clients = dictGetEntryVal(de);
ln = listSearchKey(clients,c);
assert(ln != NULL);
listDelNode(clients,ln);
+ if (listLength(clients) == 0) {
+ /* Free the list and associated hash entry at all if this was
+ * the latest client, so that it will be possible to abuse
+ * Redis PUBSUB creating millions of channels. */
+ dictDelete(server.pubsub_channels,channel);
+ }
}
/* Notify the client */
if (notify) {
addReply(c,shared.mbulk3);
addReply(c,shared.unsubscribebulk);
- addReplyBulk(c,class);
- addReplyLong(c,dictSize(c->pubsub_classes));
+ addReplyBulk(c,channel);
+ addReplyLong(c,dictSize(c->pubsub_channels)+
+ listLength(c->pubsub_patterns));
+
+ }
+ decrRefCount(channel); /* it is finally safe to release it */
+ return retval;
+}
+
+/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
+static int pubsubSubscribePattern(redisClient *c, robj *pattern) {
+ int retval = 0;
+
+ if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
+ retval = 1;
+ pubsubPattern *pat;
+ listAddNodeTail(c->pubsub_patterns,pattern);
+ incrRefCount(pattern);
+ pat = zmalloc(sizeof(*pat));
+ pat->pattern = getDecodedObject(pattern);
+ pat->client = c;
+ listAddNodeTail(server.pubsub_patterns,pat);
+ }
+ /* Notify the client */
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.psubscribebulk);
+ addReplyBulk(c,pattern);
+ addReplyLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
+ return retval;
+}
+
+/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
+ * 0 if the client was not subscribed to the specified channel. */
+static int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
+ listNode *ln;
+ pubsubPattern pat;
+ int retval = 0;
+
+ incrRefCount(pattern); /* Protect the object. May be the same we remove */
+ if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
+ retval = 1;
+ listDelNode(c->pubsub_patterns,ln);
+ pat.client = c;
+ pat.pattern = pattern;
+ ln = listSearchKey(server.pubsub_patterns,&pat);
+ listDelNode(server.pubsub_patterns,ln);
+ }
+ /* Notify the client */
+ if (notify) {
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.punsubscribebulk);
+ addReplyBulk(c,pattern);
+ addReplyLong(c,dictSize(c->pubsub_channels)+
+ listLength(c->pubsub_patterns));
}
+ decrRefCount(pattern);
return retval;
}
-/* Unsubscribe from all the classes. Return the number of classes the
- * client was subscribed to. */
-static int pubsubUnsubscribeAll(redisClient *c, int notify) {
- dictIterator *di = dictGetIterator(c->pubsub_classes);
+/* Unsubscribe from all the channels. Return the number of channels the
+ * client was subscribed from. */
+static int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
+ dictIterator *di = dictGetIterator(c->pubsub_channels);
dictEntry *de;
int count = 0;
while((de = dictNext(di)) != NULL) {
- robj *class = dictGetEntryKey(de);
+ robj *channel = dictGetEntryKey(de);
- count += pubsubUnsubscribe(c,class,notify);
+ count += pubsubUnsubscribeChannel(c,channel,notify);
}
dictReleaseIterator(di);
return count;
}
+/* Unsubscribe from all the patterns. Return the number of patterns the
+ * client was subscribed from. */
+static int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
+ listNode *ln;
+ listIter li;
+ int count = 0;
+
+ listRewind(c->pubsub_patterns,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ robj *pattern = ln->value;
+
+ count += pubsubUnsubscribePattern(c,pattern,notify);
+ }
+ return count;
+}
+
/* Publish a message */
-static int pubsubPublishMessage(robj *class, robj *message) {
+static int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
struct dictEntry *de;
+ listNode *ln;
+ listIter li;
- de = dictFind(server.pubsub_classes,class);
+ /* Send to clients listening for that channel */
+ de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetEntryVal(de);
listNode *ln;
addReply(c,shared.mbulk3);
addReply(c,shared.messagebulk);
- addReplyBulk(c,class);
+ addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
+ /* Send to clients listening to matching channels */
+ if (listLength(server.pubsub_patterns)) {
+ listRewind(server.pubsub_patterns,&li);
+ channel = getDecodedObject(channel);
+ while ((ln = listNext(&li)) != NULL) {
+ pubsubPattern *pat = ln->value;
+
+ if (stringmatchlen((char*)pat->pattern->ptr,
+ sdslen(pat->pattern->ptr),
+ (char*)channel->ptr,
+ sdslen(channel->ptr),0)) {
+ addReply(pat->client,shared.mbulk3);
+ addReply(pat->client,shared.messagebulk);
+ addReplyBulk(pat->client,channel);
+ addReplyBulk(pat->client,message);
+ receivers++;
+ }
+ }
+ decrRefCount(channel);
+ }
return receivers;
}
int j;
for (j = 1; j < c->argc; j++)
- pubsubSubscribe(c,c->argv[j]);
+ pubsubSubscribeChannel(c,c->argv[j]);
}
static void unsubscribeCommand(redisClient *c) {
if (c->argc == 1) {
- pubsubUnsubscribeAll(c,1);
+ pubsubUnsubscribeAllChannels(c,1);
return;
} else {
int j;
for (j = 1; j < c->argc; j++)
- pubsubUnsubscribe(c,c->argv[j],1);
+ pubsubUnsubscribeChannel(c,c->argv[j],1);
+ }
+}
+
+static void psubscribeCommand(redisClient *c) {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubSubscribePattern(c,c->argv[j]);
+}
+
+static void punsubscribeCommand(redisClient *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeAllPatterns(c,1);
+ return;
+ } else {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubUnsubscribePattern(c,c->argv[j],1);
}
}
(void*)key, key->refcount, (unsigned long long) key->vm.page,
(unsigned long long) key->vm.usedpages));
}
+ } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) {
+ lookupKeyRead(c->db,c->argv[2]);
+ addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) {
dictEntry *de = dictFind(c->db->dict,c->argv[2]);
robj *key, *val;
}
} else {
addReplySds(c,sdsnew(
- "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPOUT <key>|RELOAD]\r\n"));
+ "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPIN <key>|SWAPOUT <key>|RELOAD]\r\n"));
}
}