From: Pieter Noordhuis Date: Thu, 16 Sep 2010 12:32:30 +0000 (+0200) Subject: Merge branch 'zset-mem' into zrevrangebyscore X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/192fc3376a0712e69e638b087c82c7e34f698f4d?hp=-c Merge branch 'zset-mem' into zrevrangebyscore --- 192fc3376a0712e69e638b087c82c7e34f698f4d diff --combined src/redis.c index caa96247,e6a1a137..7b1b3f4f --- a/src/redis.c +++ b/src/redis.c @@@ -51,7 -51,6 +51,7 @@@ #include #include #include +#include /* Our shared "common" objects */ @@@ -171,7 -170,6 +171,7 @@@ struct redisCommand readonlyCommandTabl {"info",infoCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"monitor",monitorCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, + {"persist",persistCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0}, {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0}, @@@ -340,7 -338,7 +340,7 @@@ dictType zsetDictType = NULL, /* val dup */ dictEncObjKeyCompare, /* key compare */ dictRedisObjectDestructor, /* key destructor */ - dictVanillaFree /* val destructor of malloc(sizeof(double)) */ + NULL /* val destructor */ }; /* Db->dict, keys are sds strings, vals are Redis objects. */ @@@ -437,48 -435,6 +437,48 @@@ void updateDictResizePolicy(void) /* ======================= Cron: called every 100 ms ======================== */ +/* Try to expire a few timed out keys. The algorithm used is adaptive and + * will use few CPU cycles if there are few expiring keys, otherwise + * it will get more aggressive to avoid that too much memory is used by + * keys that can be removed from the keyspace. */ +void activeExpireCycle(void) { + int j; + + for (j = 0; j < server.dbnum; j++) { + int expired; + redisDb *db = server.db+j; + + /* Continue to expire if at the end of the cycle more than 25% + * of the keys were expired. */ + do { + long num = dictSize(db->expires); + time_t now = time(NULL); + + expired = 0; + if (num > REDIS_EXPIRELOOKUPS_PER_CRON) + num = REDIS_EXPIRELOOKUPS_PER_CRON; + while (num--) { + dictEntry *de; + time_t t; + + if ((de = dictGetRandomKey(db->expires)) == NULL) break; + t = (time_t) dictGetEntryVal(de); + if (now > t) { + sds key = dictGetEntryKey(de); + robj *keyobj = createStringObject(key,sdslen(key)); + + propagateExpire(db,keyobj); + dbDelete(db,keyobj); + decrRefCount(keyobj); + expired++; + server.stat_expiredkeys++; + } + } + } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); + } +} + + int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j, loops = server.cronloops++; REDIS_NOTUSED(eventLoop); @@@ -577,10 -533,41 +577,10 @@@ } } - /* Try to expire a few timed out keys. The algorithm used is adaptive and - * will use few CPU cycles if there are few expiring keys, otherwise - * it will get more aggressive to avoid that too much memory is used by - * keys that can be removed from the keyspace. */ - for (j = 0; j < server.dbnum; j++) { - int expired; - redisDb *db = server.db+j; - - /* Continue to expire if at the end of the cycle more than 25% - * of the keys were expired. */ - do { - long num = dictSize(db->expires); - time_t now = time(NULL); - - expired = 0; - if (num > REDIS_EXPIRELOOKUPS_PER_CRON) - num = REDIS_EXPIRELOOKUPS_PER_CRON; - while (num--) { - dictEntry *de; - time_t t; - - if ((de = dictGetRandomKey(db->expires)) == NULL) break; - t = (time_t) dictGetEntryVal(de); - if (now > t) { - sds key = dictGetEntryKey(de); - robj *keyobj = createStringObject(key,sdslen(key)); - - dbDelete(db,keyobj); - decrRefCount(keyobj); - expired++; - server.stat_expiredkeys++; - } - } - } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); - } + /* Expire a few keys per cycle, only if this is a master. + * On slaves we wait for DEL operations synthesized by the master + * in order to guarantee a strict consistency. */ + if (server.masterhost == NULL) activeExpireCycle(); /* Swap a few keys on disk if we are over the memory limit and VM * is enbled. Try to free objects from the free list first. */ @@@ -744,7 -731,6 +744,7 @@@ void initServerConfig() server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE; server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES; server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE; + server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES; server.shutdown_asap = 0; resetServerSaveParams(); @@@ -910,20 -896,15 +910,20 @@@ int processCommand(redisClient *c) } else if (c->multibulk) { if (c->bulklen == -1) { if (((char*)c->argv[0]->ptr)[0] != '$') { - addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n")); + addReplyError(c,"multi bulk protocol error"); resetClient(c); return 1; } else { - int bulklen = atoi(((char*)c->argv[0]->ptr)+1); + char *eptr; + long bulklen = strtol(((char*)c->argv[0]->ptr)+1,&eptr,10); + int perr = eptr[0] != '\0'; + decrRefCount(c->argv[0]); - if (bulklen < 0 || bulklen > 1024*1024*1024) { + if (perr || bulklen == LONG_MIN || bulklen == LONG_MAX || + bulklen < 0 || bulklen > 1024*1024*1024) + { c->argc--; - addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); + addReplyError(c,"invalid bulk write count"); resetClient(c); return 1; } @@@ -976,28 -957,27 +976,28 @@@ * such wrong arity, bad command name and so forth. */ cmd = lookupCommand(c->argv[0]->ptr); if (!cmd) { - addReplySds(c, - sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n", - (char*)c->argv[0]->ptr)); + addReplyErrorFormat(c,"unknown command '%s'", + (char*)c->argv[0]->ptr); resetClient(c); return 1; } else if ((cmd->arity > 0 && cmd->arity != c->argc) || (c->argc < -cmd->arity)) { - addReplySds(c, - sdscatprintf(sdsempty(), - "-ERR wrong number of arguments for '%s' command\r\n", - cmd->name)); + addReplyErrorFormat(c,"wrong number of arguments for '%s' command", + cmd->name); resetClient(c); return 1; } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { /* This is a bulk command, we have to read the last argument yet. */ - int bulklen = atoi(c->argv[c->argc-1]->ptr); + char *eptr; + long bulklen = strtol(c->argv[c->argc-1]->ptr,&eptr,10); + int perr = eptr[0] != '\0'; decrRefCount(c->argv[c->argc-1]); - if (bulklen < 0 || bulklen > 1024*1024*1024) { + if (perr || bulklen == LONG_MAX || bulklen == LONG_MIN || + bulklen < 0 || bulklen > 1024*1024*1024) + { c->argc--; - addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); + addReplyError(c,"invalid bulk write count"); resetClient(c); return 1; } @@@ -1024,7 -1004,7 +1024,7 @@@ /* Check if the user is authenticated */ if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { - addReplySds(c,sdsnew("-ERR operation not permitted\r\n")); + addReplyError(c,"operation not permitted"); resetClient(c); return 1; } @@@ -1033,7 -1013,7 +1033,7 @@@ if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) && zmalloc_used_memory() > server.maxmemory) { - addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n")); + addReplyError(c,"command not allowed when used memory > 'maxmemory'"); resetClient(c); return 1; } @@@ -1043,7 -1023,7 +1043,7 @@@ && cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand && cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) { - addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n")); + addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context"); resetClient(c); return 1; } @@@ -1084,7 -1064,11 +1084,7 @@@ int prepareForShutdown() if (server.vm_enabled) unlink(server.vm_swap_file); } else { /* Snapshotting. Perform a SYNC SAVE and exit */ - if (rdbSave(server.dbfilename) == REDIS_OK) { - if (server.daemonize) - unlink(server.pidfile); - redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); - } else { + if (rdbSave(server.dbfilename) != REDIS_OK) { /* Ooops.. error saving! The best we can do is to continue * operating. Note that if there was a background saving process, * in the next cron() Redis will be notified that the background @@@ -1094,7 -1078,6 +1094,7 @@@ return REDIS_ERR; } } + if (server.daemonize) unlink(server.pidfile); redisLog(REDIS_WARNING,"Server exit now, bye bye..."); return REDIS_OK; } @@@ -1107,7 -1090,7 +1107,7 @@@ void authCommand(redisClient *c) addReply(c,shared.ok); } else { c->authenticated = 0; - addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n")); + addReplyError(c,"invalid password"); } } @@@ -1148,10 -1131,6 +1148,10 @@@ sds genRedisInfoString(void) time_t uptime = time(NULL)-server.stat_starttime; int j; char hmem[64]; + struct rusage self_ru, c_ru; + + getrusage(RUSAGE_SELF, &self_ru); + getrusage(RUSAGE_CHILDREN, &c_ru); bytesToHuman(hmem,zmalloc_used_memory()); info = sdscatprintf(sdsempty(), @@@ -1163,16 -1142,11 +1163,16 @@@ "process_id:%ld\r\n" "uptime_in_seconds:%ld\r\n" "uptime_in_days:%ld\r\n" + "used_cpu_sys:%.2f\r\n" + "used_cpu_user:%.2f\r\n" + "used_cpu_sys_childrens:%.2f\r\n" + "used_cpu_user_childrens:%.2f\r\n" "connected_clients:%d\r\n" "connected_slaves:%d\r\n" "blocked_clients:%d\r\n" "used_memory:%zu\r\n" "used_memory_human:%s\r\n" + "mem_fragmentation_ratio:%.2f\r\n" "changes_since_last_save:%lld\r\n" "bgsave_in_progress:%d\r\n" "last_save_time:%ld\r\n" @@@ -1194,16 -1168,11 +1194,16 @@@ (long) getpid(), uptime, uptime/(3600*24), + (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000, + (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000, + (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000, + (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000, listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), server.blpop_blocked_clients, zmalloc_used_memory(), hmem, + zmalloc_get_fragmentation_ratio(), server.dirty, server.bgsavechildpid != -1, server.lastsave, @@@ -1333,8 -1302,7 +1333,8 @@@ void freeMemoryIfNeeded(void) if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue; for (j = 0; j < server.dbnum; j++) { int minttl = -1; - robj *minkey = NULL; + sds minkey = NULL; + robj *keyobj = NULL; struct dictEntry *de; if (dictSize(server.db[j].expires)) { @@@ -1351,10 -1319,7 +1351,10 @@@ minttl = t; } } - dbDelete(server.db+j,minkey); + keyobj = createStringObject(minkey,sdslen(minkey)); + dbDelete(server.db+j,keyobj); + server.stat_expiredkeys++; + decrRefCount(keyobj); } } if (!freed) return; /* nothing to free... */ @@@ -1385,17 -1350,9 +1385,17 @@@ void linuxOvercommitMemoryWarning(void } #endif /* __linux__ */ +void createPidFile(void) { + /* Try to write the pid file in a best-effort way. */ + FILE *fp = fopen(server.pidfile,"w"); + if (fp) { + fprintf(fp,"%d\n",getpid()); + fclose(fp); + } +} + void daemonize(void) { int fd; - FILE *fp; if (fork() != 0) exit(0); /* parent exits */ setsid(); /* create a new session */ @@@ -1409,6 -1366,12 +1409,6 @@@ dup2(fd, STDERR_FILENO); if (fd > STDERR_FILENO) close(fd); } - /* Try to write the pid file */ - fp = fopen(server.pidfile,"w"); - if (fp) { - fprintf(fp,"%d\n",getpid()); - fclose(fp); - } } void version() { @@@ -1441,7 -1404,6 +1441,7 @@@ int main(int argc, char **argv) } if (server.daemonize) daemonize(); initServer(); + if (server.daemonize) createPidFile(); redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION); #ifdef __linux__ linuxOvercommitMemoryWarning(); @@@ -1518,7 -1480,6 +1518,7 @@@ void segvHandler(int sig, siginfo_t *in redisLog(REDIS_WARNING,"%s", messages[i]); /* free(messages); Don't call free() with possibly corrupted memory. */ + if (server.daemonize) unlink(server.pidfile); _exit(0); } diff --combined src/redis.h index 38727ae2,4b45f5f4..3de054f2 --- a/src/redis.h +++ b/src/redis.h @@@ -26,7 -26,6 +26,7 @@@ #include "anet.h" /* Networking the easy way */ #include "zipmap.h" /* Compact string -> string data structure */ #include "ziplist.h" /* Compact list data structure */ +#include "intset.h" /* Compact integer set structure */ #include "version.h" /* Error codes */ @@@ -47,7 -46,6 +47,7 @@@ #define REDIS_MAX_WRITE_PER_EVENT (1024*64) #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */ #define REDIS_SHARED_INTEGERS 10000 +#define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */ /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */ #define REDIS_WRITEV_THRESHOLD 3 @@@ -84,7 -82,6 +84,7 @@@ #define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */ #define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */ #define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ +#define REDIS_ENCODING_INTSET 6 /* Encoded as intset */ /* Object types only used for dumping to disk */ #define REDIS_EXPIRETIME 253 @@@ -191,7 -188,6 +191,7 @@@ #define REDIS_HASH_MAX_ZIPMAP_VALUE 512 #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 1024 #define REDIS_LIST_MAX_ZIPLIST_VALUE 32 +#define REDIS_SET_MAX_INTSET_ENTRIES 4096 /* Sets operations codes */ #define REDIS_OP_UNION 0 @@@ -287,7 -283,7 +287,7 @@@ typedef struct redisClient sds querybuf; robj **argv, **mbargv; int argc, mbargc; - int bulklen; /* bulk read len. -1 if not in bulk read mode */ + long bulklen; /* bulk read len. -1 if not in bulk read mode */ int multibulk; /* multi bulk command format active */ list *reply; int sentlen; @@@ -310,10 -306,6 +310,10 @@@ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ + + /* Response buffer */ + int bufpos; + char buf[REDIS_REPLY_CHUNK_BYTES]; } redisClient; struct saveparam { @@@ -340,7 -332,6 +340,7 @@@ struct redisServer int fd; redisDb *db; long long dirty; /* changes to DB from the last save */ + long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */ list *clients; list *slaves, *monitors; char neterr[ANET_ERR_LEN]; @@@ -407,7 -398,6 +407,7 @@@ size_t hash_max_zipmap_value; size_t list_max_ziplist_entries; size_t list_max_ziplist_value; + size_t set_max_intset_entries; /* Virtual memory state */ FILE *vm_fp; int vm_fd; @@@ -490,13 -480,14 +490,14 @@@ typedef struct _redisSortOperation } redisSortOperation; /* ZSETs use a specialized version of Skiplists */ - typedef struct zskiplistNode { - struct zskiplistNode **forward; - struct zskiplistNode *backward; - unsigned int *span; - double score; robj *obj; + double score; + struct zskiplistNode *backward; + struct zskiplistLevel { + struct zskiplistNode *forward; + unsigned int span; + } level[]; } zskiplistNode; typedef struct zskiplist { @@@ -545,14 -536,6 +546,14 @@@ typedef struct listNode *ln; /* Entry in linked list */ } listTypeEntry; +/* Structure to hold set iteration abstraction. */ +typedef struct { + robj *subject; + int encoding; + int ii; /* intset iterator */ + dictIterator *di; +} setTypeIterator; + /* Structure to hold hash iteration abstration. Note that iteration over * hashes involves both fields and values. Because it is possible that * not both are required, store pointers in the iterator to avoid @@@ -593,8 -576,6 +594,8 @@@ void resetClient(redisClient *c) void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask); void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); +void *addDeferredMultiBulkLength(redisClient *c); +void setDeferredMultiBulkLength(redisClient *c, void *node, long length); void addReplySds(redisClient *c, sds s); void processInputBuffer(redisClient *c); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); @@@ -604,23 -585,11 +605,23 @@@ void addReplyBulkCString(redisClient *c void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); void addReplySds(redisClient *c, sds s); +void addReplyError(redisClient *c, char *err); +void addReplyStatus(redisClient *c, char *status); void addReplyDouble(redisClient *c, double d); void addReplyLongLong(redisClient *c, long long ll); -void addReplyUlong(redisClient *c, unsigned long ul); +void addReplyMultiBulkLen(redisClient *c, long length); void *dupClientReplyValue(void *o); +#ifdef __GNUC__ +void addReplyErrorFormat(redisClient *c, const char *fmt, ...) + __attribute__((format(printf, 2, 3))); +void addReplyStatusFormat(redisClient *c, const char *fmt, ...) + __attribute__((format(printf, 2, 3))); +#else +void addReplyErrorFormat(redisClient *c, const char *fmt, ...); +void addReplyStatusFormat(redisClient *c, const char *fmt, ...); +#endif + /* List data type */ void listTypeTryConversion(robj *subject, robj *value); void listTypePush(robj *subject, robj *value, int where); @@@ -665,7 -634,6 +666,7 @@@ robj *createStringObjectFromLongLong(lo robj *createListObject(void); robj *createZiplistObject(void); robj *createSetObject(void); +robj *createIntsetObject(void); robj *createHashObject(void); robj *createZsetObject(void); int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg); @@@ -707,7 -675,7 +708,7 @@@ void backgroundRewriteDoneHandler(int s /* Sorted sets data type */ zskiplist *zslCreate(void); void zslFree(zskiplist *zsl); - void zslInsert(zskiplist *zsl, double score, robj *obj); + zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj); /* Core functions */ void freeMemoryIfNeeded(void); @@@ -749,18 -717,6 +750,18 @@@ int dontWaitForSwappedKey(redisClient * void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); vmpointer *vmSwapObjectBlocking(robj *val); +/* Set data type */ +robj *setTypeCreate(robj *value); +int setTypeAdd(robj *subject, robj *value); +int setTypeRemove(robj *subject, robj *value); +int setTypeIsMember(robj *subject, robj *value); +setTypeIterator *setTypeInitIterator(robj *subject); +void setTypeReleaseIterator(setTypeIterator *si); +robj *setTypeNext(setTypeIterator *si); +robj *setTypeRandomElement(robj *subject); +unsigned long setTypeSize(robj *subject); +void setTypeConvert(robj *subject, int enc); + /* Hash data type */ void convertToRealHash(robj *o); void hashTypeTryConversion(robj *subject, robj **argv, int start, int end); @@@ -789,8 -745,6 +790,8 @@@ int stringmatch(const char *pattern, co long long memtoll(const char *p, int *err); int ll2string(char *s, size_t len, long long value); int isStringRepresentableAsLong(sds s, long *longval); +int isStringRepresentableAsLongLong(sds s, long long *longval); +int isObjectRepresentableAsLongLong(robj *o, long long *llongval); /* Configuration */ void loadServerConfig(char *filename); @@@ -799,10 -753,10 +800,10 @@@ void resetServerSaveParams() /* db.c -- Keyspace access API */ int removeExpire(redisDb *db, robj *key); +void propagateExpire(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key); -int deleteIfVolatile(redisDb *db, robj *key); time_t getExpire(redisDb *db, robj *key); -int setExpire(redisDb *db, robj *key, time_t when); +void setExpire(redisDb *db, robj *key, time_t when); robj *lookupKey(redisDb *db, robj *key); robj *lookupKeyRead(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key); @@@ -885,7 -839,6 +886,7 @@@ void expireCommand(redisClient *c) void expireatCommand(redisClient *c); void getsetCommand(redisClient *c); void ttlCommand(redisClient *c); +void persistCommand(redisClient *c); void slaveofCommand(redisClient *c); void debugCommand(redisClient *c); void msetCommand(redisClient *c); diff --combined src/t_zset.c index d944e923,3d9f612a..93ade5aa --- a/src/t_zset.c +++ b/src/t_zset.c @@@ -24,13 -24,7 +24,7 @@@ * from tail to head, useful for ZREVRANGE. */ zskiplistNode *zslCreateNode(int level, double score, robj *obj) { - zskiplistNode *zn = zmalloc(sizeof(*zn)); - - zn->forward = zmalloc(sizeof(zskiplistNode*) * level); - if (level > 1) - zn->span = zmalloc(sizeof(unsigned int) * (level - 1)); - else - zn->span = NULL; + zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); zn->score = score; zn->obj = obj; return zn; @@@ -45,11 -39,8 +39,8 @@@ zskiplist *zslCreate(void) zsl->length = 0; zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { - zsl->header->forward[j] = NULL; - - /* span has space for ZSKIPLIST_MAXLEVEL-1 elements */ - if (j < ZSKIPLIST_MAXLEVEL-1) - zsl->header->span[j] = 0; + zsl->header->level[j].forward = NULL; + zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; @@@ -58,19 -49,15 +49,15 @@@ void zslFreeNode(zskiplistNode *node) { decrRefCount(node->obj); - zfree(node->forward); - zfree(node->span); zfree(node); } void zslFree(zskiplist *zsl) { - zskiplistNode *node = zsl->header->forward[0], *next; + zskiplistNode *node = zsl->header->level[0].forward, *next; - zfree(zsl->header->forward); - zfree(zsl->header->span); zfree(zsl->header); while(node) { - next = node->forward[0]; + next = node->level[0].forward; zslFreeNode(node); node = next; } @@@ -84,7 -71,7 +71,7 @@@ int zslRandomLevel(void) return (levellevel-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; - - while (x->forward[i] && - (x->forward[i]->score < score || - (x->forward[i]->score == score && - compareStringObjects(x->forward[i]->obj,obj) < 0))) { - rank[i] += i > 0 ? x->span[i-1] : 1; - x = x->forward[i]; + while (x->level[i].forward && + (x->level[i].forward->score < score || + (x->level[i].forward->score == score && + compareStringObjects(x->level[i].forward->obj,obj) < 0))) { + rank[i] += x->level[i].span; + x = x->level[i].forward; } update[i] = x; } @@@ -112,56 -98,51 +98,51 @@@ for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; - update[i]->span[i-1] = zsl->length; + update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,obj); for (i = 0; i < level; i++) { - x->forward[i] = update[i]->forward[i]; - update[i]->forward[i] = x; + x->level[i].forward = update[i]->level[i].forward; + update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ - if (i > 0) { - x->span[i-1] = update[i]->span[i-1] - (rank[0] - rank[i]); - update[i]->span[i-1] = (rank[0] - rank[i]) + 1; - } + x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); + update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ for (i = level; i < zsl->level; i++) { - update[i]->span[i-1]++; + update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; - if (x->forward[0]) - x->forward[0]->backward = x; + if (x->level[0].forward) + x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; + return x; } /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */ void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { int i; for (i = 0; i < zsl->level; i++) { - if (update[i]->forward[i] == x) { - if (i > 0) { - update[i]->span[i-1] += x->span[i-1] - 1; - } - update[i]->forward[i] = x->forward[i]; + if (update[i]->level[i].forward == x) { + update[i]->level[i].span += x->level[i].span - 1; + update[i]->level[i].forward = x->level[i].forward; } else { - /* invariant: i > 0, because update[0]->forward[0] - * is always equal to x */ - update[i]->span[i-1] -= 1; + update[i]->level[i].span -= 1; } } - if (x->forward[0]) { - x->forward[0]->backward = x->backward; + if (x->level[0].forward) { + x->level[0].forward->backward = x->backward; } else { zsl->tail = x->backward; } - while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL) + while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) zsl->level--; zsl->length--; } @@@ -173,16 -154,16 +154,16 @@@ int zslDelete(zskiplist *zsl, double sc x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->forward[i] && - (x->forward[i]->score < score || - (x->forward[i]->score == score && - compareStringObjects(x->forward[i]->obj,obj) < 0))) - x = x->forward[i]; + while (x->level[i].forward && + (x->level[i].forward->score < score || + (x->level[i].forward->score == score && + compareStringObjects(x->level[i].forward->obj,obj) < 0))) + x = x->level[i].forward; update[i] = x; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ - x = x->forward[0]; + x = x->level[0].forward; if (x && score == x->score && equalStringObjects(x->obj,obj)) { zslDeleteNode(zsl, x, update); zslFreeNode(x); @@@ -204,16 -185,16 +185,16 @@@ unsigned long zslDeleteRangeByScore(zsk x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->forward[i] && x->forward[i]->score < min) - x = x->forward[i]; + while (x->level[i].forward && x->level[i].forward->score < min) + x = x->level[i].forward; update[i] = x; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ - x = x->forward[0]; + x = x->level[0].forward; while (x && x->score <= max) { - zskiplistNode *next = x->forward[0]; - zslDeleteNode(zsl, x, update); + zskiplistNode *next = x->level[0].forward; + zslDeleteNode(zsl,x,update); dictDelete(dict,x->obj); zslFreeNode(x); removed++; @@@ -231,18 -212,18 +212,18 @@@ unsigned long zslDeleteRangeByRank(zski x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->forward[i] && (traversed + (i > 0 ? x->span[i-1] : 1)) < start) { - traversed += i > 0 ? x->span[i-1] : 1; - x = x->forward[i]; + while (x->level[i].forward && (traversed + x->level[i].span) < start) { + traversed += x->level[i].span; + x = x->level[i].forward; } update[i] = x; } traversed++; - x = x->forward[0]; + x = x->level[0].forward; while (x && traversed <= end) { - zskiplistNode *next = x->forward[0]; - zslDeleteNode(zsl, x, update); + zskiplistNode *next = x->level[0].forward; + zslDeleteNode(zsl,x,update); dictDelete(dict,x->obj); zslFreeNode(x); removed++; @@@ -260,12 -241,12 +241,12 @@@ zskiplistNode *zslFirstWithScore(zskipl x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->forward[i] && x->forward[i]->score < score) - x = x->forward[i]; + while (x->level[i].forward && x->level[i].forward->score < score) + x = x->level[i].forward; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ - return x->forward[0]; + return x->level[0].forward; } /* Find the rank for an element by both score and key. @@@ -279,12 -260,12 +260,12 @@@ unsigned long zslistTypeGetRank(zskipli x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->forward[i] && - (x->forward[i]->score < score || - (x->forward[i]->score == score && - compareStringObjects(x->forward[i]->obj,o) <= 0))) { - rank += i > 0 ? x->span[i-1] : 1; - x = x->forward[i]; + while (x->level[i].forward && + (x->level[i].forward->score < score || + (x->level[i].forward->score == score && + compareStringObjects(x->level[i].forward->obj,o) <= 0))) { + rank += x->level[i].span; + x = x->level[i].forward; } /* x might be equal to zsl->header, so test if obj is non-NULL */ @@@ -303,10 -284,10 +284,10 @@@ zskiplistNode* zslistTypeGetElementByRa x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->forward[i] && (traversed + (i>0 ? x->span[i-1] : 1)) <= rank) + while (x->level[i].forward && (traversed + x->level[i].span) <= rank) { - traversed += i > 0 ? x->span[i-1] : 1; - x = x->forward[i]; + traversed += x->level[i].span; + x = x->level[i].forward; } if (traversed == rank) { return x; @@@ -319,13 -300,11 +300,11 @@@ * Sorted set commands *----------------------------------------------------------------------------*/ - /* This generic command implements both ZADD and ZINCRBY. - * scoreval is the score if the operation is a ZADD (doincrement == 0) or - * the increment if the operation is a ZINCRBY (doincrement == 1). */ - void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) { + /* This generic command implements both ZADD and ZINCRBY. */ + void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double score, int incr) { robj *zsetobj; zset *zs; - double *score; + zskiplistNode *znode; zsetobj = lookupKeyWrite(c->db,key); if (zsetobj == NULL) { @@@ -339,71 -318,73 +318,72 @@@ } zs = zsetobj->ptr; - /* Ok now since we implement both ZADD and ZINCRBY here the code - * needs to handle the two different conditions. It's all about setting - * '*score', that is, the new score to set, to the right value. */ - score = zmalloc(sizeof(double)); - if (doincrement) { - dictEntry *de; - + /* Since both ZADD and ZINCRBY are implemented here, we need to increment + * the score first by the current score if ZINCRBY is called. */ + if (incr) { /* Read the old score. If the element was not present starts from 0 */ - de = dictFind(zs->dict,ele); - if (de) { - double *oldscore = dictGetEntryVal(de); - *score = *oldscore + scoreval; - } else { - *score = scoreval; - } - if (isnan(*score)) { + dictEntry *de = dictFind(zs->dict,ele); + if (de != NULL) + score += *(double*)dictGetEntryVal(de); + + if (isnan(score)) { - addReplySds(c, - sdsnew("-ERR resulting score is not a number (NaN)\r\n")); + addReplyError(c,"resulting score is not a number (NaN)"); - zfree(score); /* Note that we don't need to check if the zset may be empty and * should be removed here, as we can only obtain Nan as score if * there was already an element in the sorted set. */ return; } - } else { - *score = scoreval; } - /* What follows is a simple remove and re-insert operation that is common - * to both ZADD and ZINCRBY... */ - if (dictAdd(zs->dict,ele,score) == DICT_OK) { - /* case 1: New element */ + /* We need to remove and re-insert the element when it was already present + * in the dictionary, to update the skiplist. Note that we delay adding a + * pointer to the score because we want to reference the score in the + * skiplist node. */ + if (dictAdd(zs->dict,ele,NULL) == DICT_OK) { + dictEntry *de; + + /* New element */ incrRefCount(ele); /* added to hash */ - zslInsert(zs->zsl,*score,ele); + znode = zslInsert(zs->zsl,score,ele); incrRefCount(ele); /* added to skiplist */ + + /* Update the score in the dict entry */ + de = dictFind(zs->dict,ele); + redisAssert(de != NULL); + dictGetEntryVal(de) = &znode->score; touchWatchedKey(c->db,c->argv[1]); server.dirty++; - if (doincrement) - addReplyDouble(c,*score); + if (incr) + addReplyDouble(c,score); else addReply(c,shared.cone); } else { dictEntry *de; - double *oldscore; + robj *curobj; + double *curscore; + int deleted; - /* case 2: Score update operation */ + /* Update score */ de = dictFind(zs->dict,ele); redisAssert(de != NULL); - oldscore = dictGetEntryVal(de); - if (*score != *oldscore) { - int deleted; + curobj = dictGetEntryKey(de); + curscore = dictGetEntryVal(de); - /* Remove and insert the element in the skip list with new score */ - deleted = zslDelete(zs->zsl,*oldscore,ele); + /* When the score is updated, reuse the existing string object to + * prevent extra alloc/dealloc of strings on ZINCRBY. */ + if (score != *curscore) { + deleted = zslDelete(zs->zsl,*curscore,curobj); redisAssert(deleted != 0); - zslInsert(zs->zsl,*score,ele); - incrRefCount(ele); - /* Update the score in the hash table */ - dictReplace(zs->dict,ele,score); + znode = zslInsert(zs->zsl,score,curobj); + incrRefCount(curobj); + + /* Update the score in the current dict entry */ + dictGetEntryVal(de) = &znode->score; touchWatchedKey(c->db,c->argv[1]); server.dirty++; - } else { - zfree(score); } - if (doincrement) - addReplyDouble(c,*score); + if (incr) + addReplyDouble(c,score); else addReply(c,shared.czero); } @@@ -425,7 -406,7 +405,7 @@@ void zremCommand(redisClient *c) robj *zsetobj; zset *zs; dictEntry *de; - double *oldscore; + double curscore; int deleted; if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || @@@ -438,8 -419,8 +418,8 @@@ return; } /* Delete from the skiplist */ - oldscore = dictGetEntryVal(de); - deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]); + curscore = *(double*)dictGetEntryVal(de); + deleted = zslDelete(zs->zsl,curscore,c->argv[2]); redisAssert(deleted != 0); /* Delete from the hash table */ @@@ -553,6 -534,7 +533,7 @@@ void zunionInterGenericCommand(redisCli zsetopsrc *src; robj *dstobj; zset *dstzset; + zskiplistNode *znode; dictIterator *di; dictEntry *de; int touched = 0; @@@ -560,8 -542,7 +541,8 @@@ /* expect setnum input keys to be given */ setnum = atoi(c->argv[2]->ptr); if (setnum < 1) { - addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n")); + addReplyError(c, + "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"); return; } @@@ -662,10 -643,10 +643,10 @@@ zfree(score); } else { robj *o = dictGetEntryKey(de); - dictAdd(dstzset->dict,o,score); - incrRefCount(o); /* added to dictionary */ - zslInsert(dstzset->zsl,*score,o); + znode = zslInsert(dstzset->zsl,*score,o); incrRefCount(o); /* added to skiplist */ + dictAdd(dstzset->dict,o,&znode->score); + incrRefCount(o); /* added to dictionary */ } } dictReleaseIterator(di); @@@ -693,10 -674,10 +674,10 @@@ } robj *o = dictGetEntryKey(de); - dictAdd(dstzset->dict,o,score); - incrRefCount(o); /* added to dictionary */ - zslInsert(dstzset->zsl,*score,o); + znode = zslInsert(dstzset->zsl,*score,o); incrRefCount(o); /* added to skiplist */ + dictAdd(dstzset->dict,o,&znode->score); + incrRefCount(o); /* added to dictionary */ } dictReleaseIterator(di); } @@@ -778,17 -759,18 +759,17 @@@ void zrangeGenericCommand(redisClient * ln = start == 0 ? zsl->tail : zslistTypeGetElementByRank(zsl, llen-start); } else { ln = start == 0 ? - zsl->header->forward[0] : zslistTypeGetElementByRank(zsl, start+1); + zsl->header->level[0].forward : zslistTypeGetElementByRank(zsl, start+1); } /* Return the result in form of a multi-bulk reply */ - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n", - withscores ? (rangelen*2) : rangelen)); + addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen); for (j = 0; j < rangelen; j++) { ele = ln->obj; addReplyBulk(c,ele); if (withscores) addReplyDouble(c,ln->score); - ln = reverse ? ln->backward : ln->forward[0]; + ln = reverse ? ln->backward : ln->level[0].forward; } } @@@ -839,7 -821,8 +820,7 @@@ void genericZrangebyscoreCommand(redisC if (c->argc != (4 + withscores) && c->argc != (7 + withscores)) badsyntax = 1; if (badsyntax) { - addReplySds(c, - sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n")); + addReplyError(c,"wrong number of arguments for ZRANGEBYSCORE"); return; } @@@ -864,14 -847,13 +845,14 @@@ zset *zsetobj = o->ptr; zskiplist *zsl = zsetobj->zsl; zskiplistNode *ln; - robj *ele, *lenobj = NULL; + robj *ele; + void *replylen = NULL; unsigned long rangelen = 0; /* Get the first node with the score >= min, or with * score > min if 'minex' is true. */ ln = zslFirstWithScore(zsl,min); - while (minex && ln && ln->score == min) ln = ln->forward[0]; + while (minex && ln && ln->score == min) ln = ln->level[0].forward; if (ln == NULL) { /* No element matching the speciifed interval */ @@@ -883,13 -865,16 +864,13 @@@ * are in the list, so we push this object that will represent * the multi-bulk length in the output buffer, and will "fix" * it later */ - if (!justcount) { - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); - } + if (!justcount) + replylen = addDeferredMultiBulkLength(c); while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) { if (offset) { offset--; - ln = ln->forward[0]; + ln = ln->level[0].forward; continue; } if (limit == 0) break; @@@ -899,14 -884,14 +880,14 @@@ if (withscores) addReplyDouble(c,ln->score); } - ln = ln->forward[0]; + ln = ln->level[0].forward; rangelen++; if (limit > 0) limit--; } if (justcount) { addReplyLongLong(c,(long)rangelen); } else { - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n", + setDeferredMultiBulkLength(c,replylen, withscores ? (rangelen*2) : rangelen); } } @@@ -929,7 -914,7 +910,7 @@@ void zcardCommand(redisClient *c) checkType(c,o,REDIS_ZSET)) return; zs = o->ptr; - addReplyUlong(c,zs->zsl->length); + addReplyLongLong(c,zs->zsl->length); } void zscoreCommand(redisClient *c) {