#include <float.h>
#include <math.h>
#include <pthread.h>
+#include <sys/resource.h>
/* Our shared "common" objects */
{"info",infoCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{"monitor",monitorCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
+ {"persist",persistCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
{"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0},
{"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
{"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
NULL, /* val dup */
dictEncObjKeyCompare, /* key compare */
dictRedisObjectDestructor, /* key destructor */
- dictVanillaFree /* val destructor of malloc(sizeof(double)) */
+ NULL /* val destructor */
};
/* Db->dict, keys are sds strings, vals are Redis objects. */
/* ======================= Cron: called every 100 ms ======================== */
+/* Try to expire a few timed out keys. The algorithm used is adaptive and
+ * will use few CPU cycles if there are few expiring keys, otherwise
+ * it will get more aggressive to avoid that too much memory is used by
+ * keys that can be removed from the keyspace. */
+void activeExpireCycle(void) {
+ int j;
+
+ for (j = 0; j < server.dbnum; j++) {
+ int expired;
+ redisDb *db = server.db+j;
+
+ /* Continue to expire if at the end of the cycle more than 25%
+ * of the keys were expired. */
+ do {
+ long num = dictSize(db->expires);
+ time_t now = time(NULL);
+
+ expired = 0;
+ if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
+ num = REDIS_EXPIRELOOKUPS_PER_CRON;
+ while (num--) {
+ dictEntry *de;
+ time_t t;
+
+ if ((de = dictGetRandomKey(db->expires)) == NULL) break;
+ t = (time_t) dictGetEntryVal(de);
+ if (now > t) {
+ sds key = dictGetEntryKey(de);
+ robj *keyobj = createStringObject(key,sdslen(key));
+
+ propagateExpire(db,keyobj);
+ dbDelete(db,keyobj);
+ decrRefCount(keyobj);
+ expired++;
+ server.stat_expiredkeys++;
+ }
+ }
+ } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
+ }
+}
+
+
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j, loops = server.cronloops++;
REDIS_NOTUSED(eventLoop);
}
}
- /* Try to expire a few timed out keys. The algorithm used is adaptive and
- * will use few CPU cycles if there are few expiring keys, otherwise
- * it will get more aggressive to avoid that too much memory is used by
- * keys that can be removed from the keyspace. */
- for (j = 0; j < server.dbnum; j++) {
- int expired;
- redisDb *db = server.db+j;
-
- /* Continue to expire if at the end of the cycle more than 25%
- * of the keys were expired. */
- do {
- long num = dictSize(db->expires);
- time_t now = time(NULL);
-
- expired = 0;
- if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
- num = REDIS_EXPIRELOOKUPS_PER_CRON;
- while (num--) {
- dictEntry *de;
- time_t t;
-
- if ((de = dictGetRandomKey(db->expires)) == NULL) break;
- t = (time_t) dictGetEntryVal(de);
- if (now > t) {
- sds key = dictGetEntryKey(de);
- robj *keyobj = createStringObject(key,sdslen(key));
-
- dbDelete(db,keyobj);
- decrRefCount(keyobj);
- expired++;
- server.stat_expiredkeys++;
- }
- }
- } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
- }
+ /* Expire a few keys per cycle, only if this is a master.
+ * On slaves we wait for DEL operations synthesized by the master
+ * in order to guarantee a strict consistency. */
+ if (server.masterhost == NULL) activeExpireCycle();
/* Swap a few keys on disk if we are over the memory limit and VM
* is enbled. Try to free objects from the free list first. */
server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
+ server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
server.shutdown_asap = 0;
resetServerSaveParams();
} else if (c->multibulk) {
if (c->bulklen == -1) {
if (((char*)c->argv[0]->ptr)[0] != '$') {
- addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
+ addReplyError(c,"multi bulk protocol error");
resetClient(c);
return 1;
} else {
- int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
+ char *eptr;
+ long bulklen = strtol(((char*)c->argv[0]->ptr)+1,&eptr,10);
+ int perr = eptr[0] != '\0';
+
decrRefCount(c->argv[0]);
- if (bulklen < 0 || bulklen > 1024*1024*1024) {
+ if (perr || bulklen == LONG_MIN || bulklen == LONG_MAX ||
+ bulklen < 0 || bulklen > 1024*1024*1024)
+ {
c->argc--;
- addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
+ addReplyError(c,"invalid bulk write count");
resetClient(c);
return 1;
}
* such wrong arity, bad command name and so forth. */
cmd = lookupCommand(c->argv[0]->ptr);
if (!cmd) {
- addReplySds(c,
- sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
- (char*)c->argv[0]->ptr));
+ addReplyErrorFormat(c,"unknown command '%s'",
+ (char*)c->argv[0]->ptr);
resetClient(c);
return 1;
} else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
(c->argc < -cmd->arity)) {
- addReplySds(c,
- sdscatprintf(sdsempty(),
- "-ERR wrong number of arguments for '%s' command\r\n",
- cmd->name));
+ addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
+ cmd->name);
resetClient(c);
return 1;
} else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
/* This is a bulk command, we have to read the last argument yet. */
- int bulklen = atoi(c->argv[c->argc-1]->ptr);
+ char *eptr;
+ long bulklen = strtol(c->argv[c->argc-1]->ptr,&eptr,10);
+ int perr = eptr[0] != '\0';
decrRefCount(c->argv[c->argc-1]);
- if (bulklen < 0 || bulklen > 1024*1024*1024) {
+ if (perr || bulklen == LONG_MAX || bulklen == LONG_MIN ||
+ bulklen < 0 || bulklen > 1024*1024*1024)
+ {
c->argc--;
- addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
+ addReplyError(c,"invalid bulk write count");
resetClient(c);
return 1;
}
/* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
- addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
+ addReplyError(c,"operation not permitted");
resetClient(c);
return 1;
}
if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) &&
zmalloc_used_memory() > server.maxmemory)
{
- addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
+ addReplyError(c,"command not allowed when used memory > 'maxmemory'");
resetClient(c);
return 1;
}
&&
cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand &&
cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) {
- addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n"));
+ addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
resetClient(c);
return 1;
}
if (server.vm_enabled) unlink(server.vm_swap_file);
} else {
/* Snapshotting. Perform a SYNC SAVE and exit */
- if (rdbSave(server.dbfilename) == REDIS_OK) {
- if (server.daemonize)
- unlink(server.pidfile);
- redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
- } else {
+ if (rdbSave(server.dbfilename) != REDIS_OK) {
/* Ooops.. error saving! The best we can do is to continue
* operating. Note that if there was a background saving process,
* in the next cron() Redis will be notified that the background
return REDIS_ERR;
}
}
+ if (server.daemonize) unlink(server.pidfile);
redisLog(REDIS_WARNING,"Server exit now, bye bye...");
return REDIS_OK;
}
addReply(c,shared.ok);
} else {
c->authenticated = 0;
- addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
+ addReplyError(c,"invalid password");
}
}
time_t uptime = time(NULL)-server.stat_starttime;
int j;
char hmem[64];
+ struct rusage self_ru, c_ru;
+
+ getrusage(RUSAGE_SELF, &self_ru);
+ getrusage(RUSAGE_CHILDREN, &c_ru);
bytesToHuman(hmem,zmalloc_used_memory());
info = sdscatprintf(sdsempty(),
"process_id:%ld\r\n"
"uptime_in_seconds:%ld\r\n"
"uptime_in_days:%ld\r\n"
+ "used_cpu_sys:%.2f\r\n"
+ "used_cpu_user:%.2f\r\n"
+ "used_cpu_sys_childrens:%.2f\r\n"
+ "used_cpu_user_childrens:%.2f\r\n"
"connected_clients:%d\r\n"
"connected_slaves:%d\r\n"
"blocked_clients:%d\r\n"
"used_memory:%zu\r\n"
"used_memory_human:%s\r\n"
+ "mem_fragmentation_ratio:%.2f\r\n"
"changes_since_last_save:%lld\r\n"
"bgsave_in_progress:%d\r\n"
"last_save_time:%ld\r\n"
(long) getpid(),
uptime,
uptime/(3600*24),
+ (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000,
+ (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000,
+ (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000,
+ (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
server.blpop_blocked_clients,
zmalloc_used_memory(),
hmem,
+ zmalloc_get_fragmentation_ratio(),
server.dirty,
server.bgsavechildpid != -1,
server.lastsave,
if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
for (j = 0; j < server.dbnum; j++) {
int minttl = -1;
- robj *minkey = NULL;
+ sds minkey = NULL;
+ robj *keyobj = NULL;
struct dictEntry *de;
if (dictSize(server.db[j].expires)) {
minttl = t;
}
}
- dbDelete(server.db+j,minkey);
+ keyobj = createStringObject(minkey,sdslen(minkey));
+ dbDelete(server.db+j,keyobj);
+ server.stat_expiredkeys++;
+ decrRefCount(keyobj);
}
}
if (!freed) return; /* nothing to free... */
}
#endif /* __linux__ */
+void createPidFile(void) {
+ /* Try to write the pid file in a best-effort way. */
+ FILE *fp = fopen(server.pidfile,"w");
+ if (fp) {
+ fprintf(fp,"%d\n",getpid());
+ fclose(fp);
+ }
+}
+
void daemonize(void) {
int fd;
- FILE *fp;
if (fork() != 0) exit(0); /* parent exits */
setsid(); /* create a new session */
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
- /* Try to write the pid file */
- fp = fopen(server.pidfile,"w");
- if (fp) {
- fprintf(fp,"%d\n",getpid());
- fclose(fp);
- }
}
void version() {
}
if (server.daemonize) daemonize();
initServer();
+ if (server.daemonize) createPidFile();
redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
#ifdef __linux__
linuxOvercommitMemoryWarning();
redisLog(REDIS_WARNING,"%s", messages[i]);
/* free(messages); Don't call free() with possibly corrupted memory. */
+ if (server.daemonize) unlink(server.pidfile);
_exit(0);
}
#include "anet.h" /* Networking the easy way */
#include "zipmap.h" /* Compact string -> string data structure */
#include "ziplist.h" /* Compact list data structure */
+#include "intset.h" /* Compact integer set structure */
#include "version.h"
/* Error codes */
#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
#define REDIS_SHARED_INTEGERS 10000
+#define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
#define REDIS_WRITEV_THRESHOLD 3
#define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
+#define REDIS_ENCODING_INTSET 6 /* Encoded as intset */
/* Object types only used for dumping to disk */
#define REDIS_EXPIRETIME 253
#define REDIS_HASH_MAX_ZIPMAP_VALUE 512
#define REDIS_LIST_MAX_ZIPLIST_ENTRIES 1024
#define REDIS_LIST_MAX_ZIPLIST_VALUE 32
+#define REDIS_SET_MAX_INTSET_ENTRIES 4096
/* Sets operations codes */
#define REDIS_OP_UNION 0
sds querybuf;
robj **argv, **mbargv;
int argc, mbargc;
- int bulklen; /* bulk read len. -1 if not in bulk read mode */
+ long bulklen; /* bulk read len. -1 if not in bulk read mode */
int multibulk; /* multi bulk command format active */
list *reply;
int sentlen;
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
+
+ /* Response buffer */
+ int bufpos;
+ char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;
struct saveparam {
int fd;
redisDb *db;
long long dirty; /* changes to DB from the last save */
+ long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */
list *clients;
list *slaves, *monitors;
char neterr[ANET_ERR_LEN];
size_t hash_max_zipmap_value;
size_t list_max_ziplist_entries;
size_t list_max_ziplist_value;
+ size_t set_max_intset_entries;
/* Virtual memory state */
FILE *vm_fp;
int vm_fd;
} redisSortOperation;
/* ZSETs use a specialized version of Skiplists */
-
typedef struct zskiplistNode {
- struct zskiplistNode **forward;
- struct zskiplistNode *backward;
- unsigned int *span;
- double score;
robj *obj;
+ double score;
+ struct zskiplistNode *backward;
+ struct zskiplistLevel {
+ struct zskiplistNode *forward;
+ unsigned int span;
+ } level[];
} zskiplistNode;
typedef struct zskiplist {
listNode *ln; /* Entry in linked list */
} listTypeEntry;
+/* Structure to hold set iteration abstraction. */
+typedef struct {
+ robj *subject;
+ int encoding;
+ int ii; /* intset iterator */
+ dictIterator *di;
+} setTypeIterator;
+
/* Structure to hold hash iteration abstration. Note that iteration over
* hashes involves both fields and values. Because it is possible that
* not both are required, store pointers in the iterator to avoid
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
void addReply(redisClient *c, robj *obj);
+void *addDeferredMultiBulkLength(redisClient *c);
+void setDeferredMultiBulkLength(redisClient *c, void *node, long length);
void addReplySds(redisClient *c, sds s);
void processInputBuffer(redisClient *c);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void addReply(redisClient *c, robj *obj);
void addReplySds(redisClient *c, sds s);
+void addReplyError(redisClient *c, char *err);
+void addReplyStatus(redisClient *c, char *status);
void addReplyDouble(redisClient *c, double d);
void addReplyLongLong(redisClient *c, long long ll);
-void addReplyUlong(redisClient *c, unsigned long ul);
+void addReplyMultiBulkLen(redisClient *c, long length);
void *dupClientReplyValue(void *o);
+#ifdef __GNUC__
+void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
+ __attribute__((format(printf, 2, 3)));
+void addReplyStatusFormat(redisClient *c, const char *fmt, ...)
+ __attribute__((format(printf, 2, 3)));
+#else
+void addReplyErrorFormat(redisClient *c, const char *fmt, ...);
+void addReplyStatusFormat(redisClient *c, const char *fmt, ...);
+#endif
+
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);
void listTypePush(robj *subject, robj *value, int where);
robj *createListObject(void);
robj *createZiplistObject(void);
robj *createSetObject(void);
+robj *createIntsetObject(void);
robj *createHashObject(void);
robj *createZsetObject(void);
int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg);
/* Sorted sets data type */
zskiplist *zslCreate(void);
void zslFree(zskiplist *zsl);
- void zslInsert(zskiplist *zsl, double score, robj *obj);
+ zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
/* Core functions */
void freeMemoryIfNeeded(void);
void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
vmpointer *vmSwapObjectBlocking(robj *val);
+/* Set data type */
+robj *setTypeCreate(robj *value);
+int setTypeAdd(robj *subject, robj *value);
+int setTypeRemove(robj *subject, robj *value);
+int setTypeIsMember(robj *subject, robj *value);
+setTypeIterator *setTypeInitIterator(robj *subject);
+void setTypeReleaseIterator(setTypeIterator *si);
+robj *setTypeNext(setTypeIterator *si);
+robj *setTypeRandomElement(robj *subject);
+unsigned long setTypeSize(robj *subject);
+void setTypeConvert(robj *subject, int enc);
+
/* Hash data type */
void convertToRealHash(robj *o);
void hashTypeTryConversion(robj *subject, robj **argv, int start, int end);
long long memtoll(const char *p, int *err);
int ll2string(char *s, size_t len, long long value);
int isStringRepresentableAsLong(sds s, long *longval);
+int isStringRepresentableAsLongLong(sds s, long long *longval);
+int isObjectRepresentableAsLongLong(robj *o, long long *llongval);
/* Configuration */
void loadServerConfig(char *filename);
/* db.c -- Keyspace access API */
int removeExpire(redisDb *db, robj *key);
+void propagateExpire(redisDb *db, robj *key);
int expireIfNeeded(redisDb *db, robj *key);
-int deleteIfVolatile(redisDb *db, robj *key);
time_t getExpire(redisDb *db, robj *key);
-int setExpire(redisDb *db, robj *key, time_t when);
+void setExpire(redisDb *db, robj *key, time_t when);
robj *lookupKey(redisDb *db, robj *key);
robj *lookupKeyRead(redisDb *db, robj *key);
robj *lookupKeyWrite(redisDb *db, robj *key);
void expireatCommand(redisClient *c);
void getsetCommand(redisClient *c);
void ttlCommand(redisClient *c);
+void persistCommand(redisClient *c);
void slaveofCommand(redisClient *c);
void debugCommand(redisClient *c);
void msetCommand(redisClient *c);
* from tail to head, useful for ZREVRANGE. */
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
- zskiplistNode *zn = zmalloc(sizeof(*zn));
-
- zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
- if (level > 1)
- zn->span = zmalloc(sizeof(unsigned int) * (level - 1));
- else
- zn->span = NULL;
+ zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->obj = obj;
return zn;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
- zsl->header->forward[j] = NULL;
-
- /* span has space for ZSKIPLIST_MAXLEVEL-1 elements */
- if (j < ZSKIPLIST_MAXLEVEL-1)
- zsl->header->span[j] = 0;
+ zsl->header->level[j].forward = NULL;
+ zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
void zslFreeNode(zskiplistNode *node) {
decrRefCount(node->obj);
- zfree(node->forward);
- zfree(node->span);
zfree(node);
}
void zslFree(zskiplist *zsl) {
- zskiplistNode *node = zsl->header->forward[0], *next;
+ zskiplistNode *node = zsl->header->level[0].forward, *next;
- zfree(zsl->header->forward);
- zfree(zsl->header->span);
zfree(zsl->header);
while(node) {
- next = node->forward[0];
+ next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
- void zslInsert(zskiplist *zsl, double score, robj *obj) {
+ zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
-
- while (x->forward[i] &&
- (x->forward[i]->score < score ||
- (x->forward[i]->score == score &&
- compareStringObjects(x->forward[i]->obj,obj) < 0))) {
- rank[i] += i > 0 ? x->span[i-1] : 1;
- x = x->forward[i];
+ while (x->level[i].forward &&
+ (x->level[i].forward->score < score ||
+ (x->level[i].forward->score == score &&
+ compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
+ rank[i] += x->level[i].span;
+ x = x->level[i].forward;
}
update[i] = x;
}
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
- update[i]->span[i-1] = zsl->length;
+ update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,obj);
for (i = 0; i < level; i++) {
- x->forward[i] = update[i]->forward[i];
- update[i]->forward[i] = x;
+ x->level[i].forward = update[i]->level[i].forward;
+ update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
- if (i > 0) {
- x->span[i-1] = update[i]->span[i-1] - (rank[0] - rank[i]);
- update[i]->span[i-1] = (rank[0] - rank[i]) + 1;
- }
+ x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
+ update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
- update[i]->span[i-1]++;
+ update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
- if (x->forward[0])
- x->forward[0]->backward = x;
+ if (x->level[0].forward)
+ x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
+ return x;
}
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
- if (update[i]->forward[i] == x) {
- if (i > 0) {
- update[i]->span[i-1] += x->span[i-1] - 1;
- }
- update[i]->forward[i] = x->forward[i];
+ if (update[i]->level[i].forward == x) {
+ update[i]->level[i].span += x->level[i].span - 1;
+ update[i]->level[i].forward = x->level[i].forward;
} else {
- /* invariant: i > 0, because update[0]->forward[0]
- * is always equal to x */
- update[i]->span[i-1] -= 1;
+ update[i]->level[i].span -= 1;
}
}
- if (x->forward[0]) {
- x->forward[0]->backward = x->backward;
+ if (x->level[0].forward) {
+ x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
- while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
+ while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
- while (x->forward[i] &&
- (x->forward[i]->score < score ||
- (x->forward[i]->score == score &&
- compareStringObjects(x->forward[i]->obj,obj) < 0)))
- x = x->forward[i];
+ while (x->level[i].forward &&
+ (x->level[i].forward->score < score ||
+ (x->level[i].forward->score == score &&
+ compareStringObjects(x->level[i].forward->obj,obj) < 0)))
+ x = x->level[i].forward;
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
- x = x->forward[0];
+ x = x->level[0].forward;
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
- while (x->forward[i] && x->forward[i]->score < min)
- x = x->forward[i];
+ while (x->level[i].forward && x->level[i].forward->score < min)
+ x = x->level[i].forward;
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
- x = x->forward[0];
+ x = x->level[0].forward;
while (x && x->score <= max) {
- zskiplistNode *next = x->forward[0];
- zslDeleteNode(zsl, x, update);
+ zskiplistNode *next = x->level[0].forward;
+ zslDeleteNode(zsl,x,update);
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
- while (x->forward[i] && (traversed + (i > 0 ? x->span[i-1] : 1)) < start) {
- traversed += i > 0 ? x->span[i-1] : 1;
- x = x->forward[i];
+ while (x->level[i].forward && (traversed + x->level[i].span) < start) {
+ traversed += x->level[i].span;
+ x = x->level[i].forward;
}
update[i] = x;
}
traversed++;
- x = x->forward[0];
+ x = x->level[0].forward;
while (x && traversed <= end) {
- zskiplistNode *next = x->forward[0];
- zslDeleteNode(zsl, x, update);
+ zskiplistNode *next = x->level[0].forward;
+ zslDeleteNode(zsl,x,update);
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
- while (x->forward[i] && x->forward[i]->score < score)
- x = x->forward[i];
+ while (x->level[i].forward && x->level[i].forward->score < score)
+ x = x->level[i].forward;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
- return x->forward[0];
+ return x->level[0].forward;
}
/* Find the rank for an element by both score and key.
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
- while (x->forward[i] &&
- (x->forward[i]->score < score ||
- (x->forward[i]->score == score &&
- compareStringObjects(x->forward[i]->obj,o) <= 0))) {
- rank += i > 0 ? x->span[i-1] : 1;
- x = x->forward[i];
+ while (x->level[i].forward &&
+ (x->level[i].forward->score < score ||
+ (x->level[i].forward->score == score &&
+ compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
+ rank += x->level[i].span;
+ x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
- while (x->forward[i] && (traversed + (i>0 ? x->span[i-1] : 1)) <= rank)
+ while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
- traversed += i > 0 ? x->span[i-1] : 1;
- x = x->forward[i];
+ traversed += x->level[i].span;
+ x = x->level[i].forward;
}
if (traversed == rank) {
return x;
* Sorted set commands
*----------------------------------------------------------------------------*/
- /* This generic command implements both ZADD and ZINCRBY.
- * scoreval is the score if the operation is a ZADD (doincrement == 0) or
- * the increment if the operation is a ZINCRBY (doincrement == 1). */
- void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
+ /* This generic command implements both ZADD and ZINCRBY. */
+ void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double score, int incr) {
robj *zsetobj;
zset *zs;
- double *score;
+ zskiplistNode *znode;
zsetobj = lookupKeyWrite(c->db,key);
if (zsetobj == NULL) {
}
zs = zsetobj->ptr;
- /* Ok now since we implement both ZADD and ZINCRBY here the code
- * needs to handle the two different conditions. It's all about setting
- * '*score', that is, the new score to set, to the right value. */
- score = zmalloc(sizeof(double));
- if (doincrement) {
- dictEntry *de;
-
+ /* Since both ZADD and ZINCRBY are implemented here, we need to increment
+ * the score first by the current score if ZINCRBY is called. */
+ if (incr) {
/* Read the old score. If the element was not present starts from 0 */
- de = dictFind(zs->dict,ele);
- if (de) {
- double *oldscore = dictGetEntryVal(de);
- *score = *oldscore + scoreval;
- } else {
- *score = scoreval;
- }
- if (isnan(*score)) {
+ dictEntry *de = dictFind(zs->dict,ele);
+ if (de != NULL)
+ score += *(double*)dictGetEntryVal(de);
+
+ if (isnan(score)) {
- addReplySds(c,
- sdsnew("-ERR resulting score is not a number (NaN)\r\n"));
+ addReplyError(c,"resulting score is not a number (NaN)");
- zfree(score);
/* Note that we don't need to check if the zset may be empty and
* should be removed here, as we can only obtain Nan as score if
* there was already an element in the sorted set. */
return;
}
- } else {
- *score = scoreval;
}
- /* What follows is a simple remove and re-insert operation that is common
- * to both ZADD and ZINCRBY... */
- if (dictAdd(zs->dict,ele,score) == DICT_OK) {
- /* case 1: New element */
+ /* We need to remove and re-insert the element when it was already present
+ * in the dictionary, to update the skiplist. Note that we delay adding a
+ * pointer to the score because we want to reference the score in the
+ * skiplist node. */
+ if (dictAdd(zs->dict,ele,NULL) == DICT_OK) {
+ dictEntry *de;
+
+ /* New element */
incrRefCount(ele); /* added to hash */
- zslInsert(zs->zsl,*score,ele);
+ znode = zslInsert(zs->zsl,score,ele);
incrRefCount(ele); /* added to skiplist */
+
+ /* Update the score in the dict entry */
+ de = dictFind(zs->dict,ele);
+ redisAssert(de != NULL);
+ dictGetEntryVal(de) = &znode->score;
touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
- if (doincrement)
- addReplyDouble(c,*score);
+ if (incr)
+ addReplyDouble(c,score);
else
addReply(c,shared.cone);
} else {
dictEntry *de;
- double *oldscore;
+ robj *curobj;
+ double *curscore;
+ int deleted;
- /* case 2: Score update operation */
+ /* Update score */
de = dictFind(zs->dict,ele);
redisAssert(de != NULL);
- oldscore = dictGetEntryVal(de);
- if (*score != *oldscore) {
- int deleted;
+ curobj = dictGetEntryKey(de);
+ curscore = dictGetEntryVal(de);
- /* Remove and insert the element in the skip list with new score */
- deleted = zslDelete(zs->zsl,*oldscore,ele);
+ /* When the score is updated, reuse the existing string object to
+ * prevent extra alloc/dealloc of strings on ZINCRBY. */
+ if (score != *curscore) {
+ deleted = zslDelete(zs->zsl,*curscore,curobj);
redisAssert(deleted != 0);
- zslInsert(zs->zsl,*score,ele);
- incrRefCount(ele);
- /* Update the score in the hash table */
- dictReplace(zs->dict,ele,score);
+ znode = zslInsert(zs->zsl,score,curobj);
+ incrRefCount(curobj);
+
+ /* Update the score in the current dict entry */
+ dictGetEntryVal(de) = &znode->score;
touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
- } else {
- zfree(score);
}
- if (doincrement)
- addReplyDouble(c,*score);
+ if (incr)
+ addReplyDouble(c,score);
else
addReply(c,shared.czero);
}
robj *zsetobj;
zset *zs;
dictEntry *de;
- double *oldscore;
+ double curscore;
int deleted;
if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
return;
}
/* Delete from the skiplist */
- oldscore = dictGetEntryVal(de);
- deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
+ curscore = *(double*)dictGetEntryVal(de);
+ deleted = zslDelete(zs->zsl,curscore,c->argv[2]);
redisAssert(deleted != 0);
/* Delete from the hash table */
zsetopsrc *src;
robj *dstobj;
zset *dstzset;
+ zskiplistNode *znode;
dictIterator *di;
dictEntry *de;
int touched = 0;
/* expect setnum input keys to be given */
setnum = atoi(c->argv[2]->ptr);
if (setnum < 1) {
- addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n"));
+ addReplyError(c,
+ "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
return;
}
zfree(score);
} else {
robj *o = dictGetEntryKey(de);
- dictAdd(dstzset->dict,o,score);
- incrRefCount(o); /* added to dictionary */
- zslInsert(dstzset->zsl,*score,o);
+ znode = zslInsert(dstzset->zsl,*score,o);
incrRefCount(o); /* added to skiplist */
+ dictAdd(dstzset->dict,o,&znode->score);
+ incrRefCount(o); /* added to dictionary */
}
}
dictReleaseIterator(di);
}
robj *o = dictGetEntryKey(de);
- dictAdd(dstzset->dict,o,score);
- incrRefCount(o); /* added to dictionary */
- zslInsert(dstzset->zsl,*score,o);
+ znode = zslInsert(dstzset->zsl,*score,o);
incrRefCount(o); /* added to skiplist */
+ dictAdd(dstzset->dict,o,&znode->score);
+ incrRefCount(o); /* added to dictionary */
}
dictReleaseIterator(di);
}
ln = start == 0 ? zsl->tail : zslistTypeGetElementByRank(zsl, llen-start);
} else {
ln = start == 0 ?
- zsl->header->forward[0] : zslistTypeGetElementByRank(zsl, start+1);
+ zsl->header->level[0].forward : zslistTypeGetElementByRank(zsl, start+1);
}
/* Return the result in form of a multi-bulk reply */
- addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
- withscores ? (rangelen*2) : rangelen));
+ addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen);
for (j = 0; j < rangelen; j++) {
ele = ln->obj;
addReplyBulk(c,ele);
if (withscores)
addReplyDouble(c,ln->score);
- ln = reverse ? ln->backward : ln->forward[0];
+ ln = reverse ? ln->backward : ln->level[0].forward;
}
}
if (c->argc != (4 + withscores) && c->argc != (7 + withscores))
badsyntax = 1;
if (badsyntax) {
- addReplySds(c,
- sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
+ addReplyError(c,"wrong number of arguments for ZRANGEBYSCORE");
return;
}
zset *zsetobj = o->ptr;
zskiplist *zsl = zsetobj->zsl;
zskiplistNode *ln;
- robj *ele, *lenobj = NULL;
+ robj *ele;
+ void *replylen = NULL;
unsigned long rangelen = 0;
/* Get the first node with the score >= min, or with
* score > min if 'minex' is true. */
ln = zslFirstWithScore(zsl,min);
- while (minex && ln && ln->score == min) ln = ln->forward[0];
+ while (minex && ln && ln->score == min) ln = ln->level[0].forward;
if (ln == NULL) {
/* No element matching the speciifed interval */
* are in the list, so we push this object that will represent
* the multi-bulk length in the output buffer, and will "fix"
* it later */
- if (!justcount) {
- lenobj = createObject(REDIS_STRING,NULL);
- addReply(c,lenobj);
- decrRefCount(lenobj);
- }
+ if (!justcount)
+ replylen = addDeferredMultiBulkLength(c);
while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) {
if (offset) {
offset--;
- ln = ln->forward[0];
+ ln = ln->level[0].forward;
continue;
}
if (limit == 0) break;
if (withscores)
addReplyDouble(c,ln->score);
}
- ln = ln->forward[0];
+ ln = ln->level[0].forward;
rangelen++;
if (limit > 0) limit--;
}
if (justcount) {
addReplyLongLong(c,(long)rangelen);
} else {
- lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",
+ setDeferredMultiBulkLength(c,replylen,
withscores ? (rangelen*2) : rangelen);
}
}
checkType(c,o,REDIS_ZSET)) return;
zs = o->ptr;
- addReplyUlong(c,zs->zsl->length);
+ addReplyLongLong(c,zs->zsl->length);
}
void zscoreCommand(redisClient *c) {