From: Pieter Noordhuis Date: Wed, 6 Apr 2011 14:15:01 +0000 (+0200) Subject: Merge branch 'unstable' into unstable-zset X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/7d8c555e92954b02d40c15702476fd48e55c4f94?ds=sidebyside;hp=-c Merge branch 'unstable' into unstable-zset Conflicts: src/object.c --- 7d8c555e92954b02d40c15702476fd48e55c4f94 diff --combined src/config.c index fec72a45,e40fdeda..f6c6ad68 --- a/src/config.c +++ b/src/config.c @@@ -261,10 -261,6 +261,10 @@@ void loadServerConfig(char *filename) server.list_max_ziplist_value = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"set-max-intset-entries") && argc == 2) { server.set_max_intset_entries = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"zset-max-ziplist-entries") && argc == 2) { + server.zset_max_ziplist_entries = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"zset-max-ziplist-value") && argc == 2) { + server.zset_max_ziplist_value = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"rename-command") && argc == 3) { struct redisCommand *cmd = lookupCommand(argv[1]); int retval; @@@ -289,6 -285,10 +289,10 @@@ err = "Target command name already exists"; goto loaderr; } } + } else if (!strcasecmp(argv[0],"cluster-enabled") && argc == 2) { + if ((server.cluster_enabled = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@@ -447,12 -447,6 +451,12 @@@ void configSetCommand(redisClient *c) } else if (!strcasecmp(c->argv[2]->ptr,"set-max-intset-entries")) { if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt; server.set_max_intset_entries = ll; + } else if (!strcasecmp(c->argv[2]->ptr,"zset-max-ziplist-entries")) { + if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt; + server.zset_max_ziplist_entries = ll; + } else if (!strcasecmp(c->argv[2]->ptr,"zset-max-ziplist-value")) { + if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt; + server.zset_max_ziplist_value = ll; } else { addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s", (char*)c->argv[2]->ptr); @@@ -604,16 -598,6 +608,16 @@@ void configGetCommand(redisClient *c) addReplyBulkLongLong(c,server.set_max_intset_entries); matches++; } + if (stringmatch(pattern,"zset-max-ziplist-entries",0)) { + addReplyBulkCString(c,"zset-max-ziplist-entries"); + addReplyBulkLongLong(c,server.zset_max_ziplist_entries); + matches++; + } + if (stringmatch(pattern,"zset-max-ziplist-value",0)) { + addReplyBulkCString(c,"zset-max-ziplist-value"); + addReplyBulkLongLong(c,server.zset_max_ziplist_value); + matches++; + } setDeferredMultiBulkLength(c,replylen,matches*2); } diff --combined src/object.c index c384d600,6a9b0214..4bf1df01 --- a/src/object.c +++ b/src/object.c @@@ -93,18 -93,15 +93,22 @@@ robj *createHashObject(void) robj *createZsetObject(void) { zset *zs = zmalloc(sizeof(*zs)); + robj *o; + zs->dict = dictCreate(&zsetDictType,NULL); zs->zsl = zslCreate(); - return createObject(REDIS_ZSET,zs); + o = createObject(REDIS_ZSET,zs); + o->encoding = REDIS_ENCODING_SKIPLIST; + return o; } +robj *createZsetZiplistObject(void) { + unsigned char *zl = ziplistNew(); + robj *o = createObject(REDIS_ZSET,zl); + o->encoding = REDIS_ENCODING_ZIPLIST; + return o; +} + void freeStringObject(robj *o) { if (o->encoding == REDIS_ENCODING_RAW) { sdsfree(o->ptr); @@@ -138,20 -135,11 +142,20 @@@ void freeSetObject(robj *o) } void freeZsetObject(robj *o) { - zset *zs = o->ptr; - - dictRelease(zs->dict); - zslFree(zs->zsl); - zfree(zs); + zset *zs; + switch (o->encoding) { + case REDIS_ENCODING_RAW: + zs = o->ptr; + dictRelease(zs->dict); + zslFree(zs->zsl); + zfree(zs); + break; + case REDIS_ENCODING_ZIPLIST: + zfree(o->ptr); + break; + default: + redisPanic("Unknown sorted set encoding"); + } } void freeHashObject(robj *o) { @@@ -417,6 -405,7 +421,7 @@@ char *strEncoding(int encoding) case REDIS_ENCODING_LINKEDLIST: return "linkedlist"; case REDIS_ENCODING_ZIPLIST: return "ziplist"; case REDIS_ENCODING_INTSET: return "intset"; + case REDIS_ENCODING_SKIPLIST: return "skiplist"; default: return "unknown"; } } @@@ -431,3 -420,42 +436,42 @@@ unsigned long estimateObjectIdleTime(ro REDIS_LRU_CLOCK_RESOLUTION; } } + + /* This is an helper function for the DEBUG command. We need to lookup keys + * without any modification of LRU or other parameters. */ + robj *objectCommandLookup(redisClient *c, robj *key) { + dictEntry *de; + + if ((de = dictFind(c->db->dict,key->ptr)) == NULL) return NULL; + return (robj*) dictGetEntryVal(de); + } + + robj *objectCommandLookupOrReply(redisClient *c, robj *key, robj *reply) { + robj *o = objectCommandLookup(c,key); + + if (!o) addReply(c, reply); + return o; + } + + /* Object command allows to inspect the internals of an Redis Object. + * Usage: OBJECT ... arguments ... */ + void objectCommand(redisClient *c) { + robj *o; + + if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) { + if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk)) + == NULL) return; + addReplyLongLong(c,o->refcount); + } else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) { + if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk)) + == NULL) return; + addReplyBulkCString(c,strEncoding(o->encoding)); + } else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) { + if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk)) + == NULL) return; + addReplyLongLong(c,estimateObjectIdleTime(o)); + } else { + addReplyError(c,"Syntax error. Try OBJECT (refcount|encoding|idletime)"); + } + } + diff --combined src/rdb.c index d2c902d7,f14467d1..27390b9c --- a/src/rdb.c +++ b/src/rdb.c @@@ -245,7 -245,7 +245,7 @@@ int rdbSaveDoubleValue(FILE *fp, doubl return rdbWriteRaw(fp,buf,len); } - /* Save a Redis object. */ + /* Save a Redis object. Returns -1 on error, 0 on success. */ int rdbSaveObject(FILE *fp, robj *o) { int n, nwritten = 0; @@@ -302,33 -302,24 +302,33 @@@ redisPanic("Unknown set encoding"); } } else if (o->type == REDIS_ZSET) { - /* Save a set value */ - zset *zs = o->ptr; - dictIterator *di = dictGetIterator(zs->dict); - dictEntry *de; - - if ((n = rdbSaveLen(fp,dictSize(zs->dict))) == -1) return -1; - nwritten += n; - - while((de = dictNext(di)) != NULL) { - robj *eleobj = dictGetEntryKey(de); - double *score = dictGetEntryVal(de); + /* Save a sorted set value */ + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + size_t l = ziplistBlobLen((unsigned char*)o->ptr); - if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1; + if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1; nwritten += n; - if ((n = rdbSaveDoubleValue(fp,*score)) == -1) return -1; + } else if (o->encoding == REDIS_ENCODING_RAW) { + zset *zs = o->ptr; + dictIterator *di = dictGetIterator(zs->dict); + dictEntry *de; + + if ((n = rdbSaveLen(fp,dictSize(zs->dict))) == -1) return -1; nwritten += n; + + while((de = dictNext(di)) != NULL) { + robj *eleobj = dictGetEntryKey(de); + double *score = dictGetEntryVal(de); + + if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1; + nwritten += n; + if ((n = rdbSaveDoubleValue(fp,*score)) == -1) return -1; + nwritten += n; + } + dictReleaseIterator(di); + } else { + redisPanic("Unknown sorted set enoding"); } - dictReleaseIterator(di); } else if (o->type == REDIS_HASH) { /* Save a hash value */ if (o->encoding == REDIS_ENCODING_ZIPMAP) { @@@ -395,8 -386,6 +395,8 @@@ int rdbSaveKeyValuePair(FILE *fp, robj vtype = REDIS_LIST_ZIPLIST; else if (vtype == REDIS_SET && val->encoding == REDIS_ENCODING_INTSET) vtype = REDIS_SET_INTSET; + else if (vtype == REDIS_ZSET && val->encoding == REDIS_ENCODING_ZIPLIST) + vtype = REDIS_ZSET_ZIPLIST; /* Save type, key, value */ if (rdbSaveType(fp,vtype) == -1) return -1; if (rdbSaveStringObject(fp,key) == -1) return -1; @@@ -756,13 -745,11 +756,13 @@@ robj *rdbLoadObject(int type, FILE *fp } else if (type == REDIS_ZSET) { /* Read list/set value */ size_t zsetlen; + size_t maxelelen = 0; zset *zs; if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; o = createZsetObject(); zs = o->ptr; + /* Load every single element of the list/set */ while(zsetlen--) { robj *ele; @@@ -772,21 -759,10 +772,21 @@@ if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; ele = tryObjectEncoding(ele); if (rdbLoadDoubleValue(fp,&score) == -1) return NULL; + + /* Don't care about integer-encoded strings. */ + if (ele->encoding == REDIS_ENCODING_RAW && + sdslen(ele->ptr) > maxelelen) + maxelelen = sdslen(ele->ptr); + znode = zslInsert(zs->zsl,score,ele); dictAdd(zs->dict,ele,&znode->score); incrRefCount(ele); /* added to skiplist */ } + + /* Convert *after* loading, since sorted sets are not stored ordered. */ + if (zsetLength(o) <= server.zset_max_ziplist_entries && + maxelelen <= server.zset_max_ziplist_value) + zsetConvert(o,REDIS_ENCODING_ZIPLIST); } else if (type == REDIS_HASH) { size_t hashlen; @@@ -835,8 -811,7 +835,8 @@@ } } else if (type == REDIS_HASH_ZIPMAP || type == REDIS_LIST_ZIPLIST || - type == REDIS_SET_INTSET) + type == REDIS_SET_INTSET || + type == REDIS_ZSET_ZIPLIST) { robj *aux = rdbLoadStringObject(fp); @@@ -871,12 -846,6 +871,12 @@@ if (intsetLen(o->ptr) > server.set_max_intset_entries) setTypeConvert(o,REDIS_ENCODING_HT); break; + case REDIS_ZSET_ZIPLIST: + o->type = REDIS_ZSET; + o->encoding = REDIS_ENCODING_ZIPLIST; + if (zsetLength(o) > server.zset_max_ziplist_entries) + zsetConvert(o,REDIS_ENCODING_RAW); + break; default: redisPanic("Unknown enoding"); break; diff --combined src/redis.c index 2b98d40c,9c726151..3d0f5378 --- a/src/redis.c +++ b/src/redis.c @@@ -70,12 -70,12 +70,12 @@@ struct redisServer server; /* server gl struct redisCommand *commandTable; struct redisCommand redisCommandTable[] = { {"get",getCommand,2,0,NULL,1,1,1,0,0}, - {"set",setCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0}, - {"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0}, - {"setex",setexCommand,4,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0}, + {"set",setCommand,3,REDIS_CMD_DENYOOM,noPreloadGetKeys,1,1,1,0,0}, + {"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,noPreloadGetKeys,1,1,1,0,0}, + {"setex",setexCommand,4,REDIS_CMD_DENYOOM,noPreloadGetKeys,2,2,1,0,0}, {"append",appendCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0}, {"strlen",strlenCommand,2,0,NULL,1,1,1,0,0}, - {"del",delCommand,-2,0,NULL,0,0,0,0,0}, + {"del",delCommand,-2,0,noPreloadGetKeys,1,-1,1,0,0}, {"exists",existsCommand,2,0,NULL,1,1,1,0,0}, {"setbit",setbitCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0}, {"getbit",getbitCommand,3,0,NULL,1,1,1,0,0}, @@@ -94,7 -94,7 +94,7 @@@ {"lpop",lpopCommand,2,0,NULL,1,1,1,0,0}, {"brpop",brpopCommand,-3,0,NULL,1,1,1,0,0}, {"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1,0,0}, - {"blpop",blpopCommand,-3,0,NULL,1,1,1,0,0}, + {"blpop",blpopCommand,-3,0,NULL,1,-2,1,0,0}, {"llen",llenCommand,2,0,NULL,1,1,1,0,0}, {"lindex",lindexCommand,3,0,NULL,1,1,1,0,0}, {"lset",lsetCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0}, @@@ -121,8 -121,8 +121,8 @@@ {"zrem",zremCommand,3,0,NULL,1,1,1,0,0}, {"zremrangebyscore",zremrangebyscoreCommand,4,0,NULL,1,1,1,0,0}, {"zremrangebyrank",zremrangebyrankCommand,4,0,NULL,1,1,1,0,0}, - {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0,0,0}, - {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0,0,0}, + {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterGetKeys,0,0,0,0,0}, + {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterGetKeys,0,0,0,0,0}, {"zrange",zrangeCommand,-4,0,NULL,1,1,1,0,0}, {"zrangebyscore",zrangebyscoreCommand,-4,0,NULL,1,1,1,0,0}, {"zrevrangebyscore",zrevrangebyscoreCommand,-4,0,NULL,1,1,1,0,0}, @@@ -152,10 -152,10 +152,10 @@@ {"randomkey",randomkeyCommand,1,0,NULL,0,0,0,0,0}, {"select",selectCommand,2,0,NULL,0,0,0,0,0}, {"move",moveCommand,3,0,NULL,1,1,1,0,0}, - {"rename",renameCommand,3,0,NULL,1,1,1,0,0}, - {"renamenx",renamenxCommand,3,0,NULL,1,1,1,0,0}, - {"expire",expireCommand,3,0,NULL,0,0,0,0,0}, - {"expireat",expireatCommand,3,0,NULL,0,0,0,0,0}, + {"rename",renameCommand,3,0,renameGetKeys,1,2,1,0,0}, + {"renamenx",renamenxCommand,3,0,renameGetKeys,1,2,1,0,0}, + {"expire",expireCommand,3,0,NULL,1,1,1,0,0}, + {"expireat",expireatCommand,3,0,NULL,1,1,1,0,0}, {"keys",keysCommand,2,0,NULL,0,0,0,0,0}, {"dbsize",dbsizeCommand,1,0,NULL,0,0,0,0,0}, {"auth",authCommand,2,0,NULL,0,0,0,0,0}, @@@ -168,7 -168,7 +168,7 @@@ {"lastsave",lastsaveCommand,1,0,NULL,0,0,0,0,0}, {"type",typeCommand,2,0,NULL,1,1,1,0,0}, {"multi",multiCommand,1,0,NULL,0,0,0,0,0}, - {"exec",execCommand,1,REDIS_CMD_DENYOOM,execBlockClientOnSwappedKeys,0,0,0,0,0}, + {"exec",execCommand,1,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0}, {"discard",discardCommand,1,0,NULL,0,0,0,0,0}, {"sync",syncCommand,1,0,NULL,0,0,0,0,0}, {"flushdb",flushdbCommand,1,0,NULL,0,0,0,0,0}, @@@ -186,8 -186,13 +186,13 @@@ {"psubscribe",psubscribeCommand,-2,0,NULL,0,0,0,0,0}, {"punsubscribe",punsubscribeCommand,-1,0,NULL,0,0,0,0,0}, {"publish",publishCommand,3,REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0,0,0}, - {"watch",watchCommand,-2,0,NULL,0,0,0,0,0}, - {"unwatch",unwatchCommand,1,0,NULL,0,0,0,0,0} + {"watch",watchCommand,-2,0,noPreloadGetKeys,1,-1,1,0,0}, + {"unwatch",unwatchCommand,1,0,NULL,0,0,0,0,0}, + {"cluster",clusterCommand,-2,0,NULL,0,0,0,0,0}, + {"restore",restoreCommand,4,0,NULL,0,0,0,0,0}, + {"migrate",migrateCommand,6,0,NULL,0,0,0,0,0}, + {"dump",dumpCommand,2,0,NULL,0,0,0,0,0}, + {"object",objectCommand,-2,0,NULL,0,0,0,0,0} }; /*============================ Utility functions ============================ */ @@@ -440,6 -445,17 +445,17 @@@ dictType keylistDictType = dictListDestructor /* val destructor */ }; + /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to + * clusterNode structures. */ + dictType clusterNodesDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + NULL /* val destructor */ + }; + int htNeedsResize(dict *dict) { long long size, used; @@@ -669,6 -685,9 +685,9 @@@ int serverCron(struct aeEventLoop *even * to detect transfer failures. */ if (!(loops % 10)) replicationCron(); + /* Run other sub-systems specific cron jobs */ + if (server.cluster_enabled && !(loops % 10)) clusterCron(); + server.cronloops++; return 100; } @@@ -821,10 -840,10 +840,12 @@@ void initServerConfig() server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES; server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE; server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES; + server.zset_max_ziplist_entries = REDIS_ZSET_MAX_ZIPLIST_ENTRIES; + server.zset_max_ziplist_value = REDIS_ZSET_MAX_ZIPLIST_VALUE; server.shutdown_asap = 0; server.cache_flush_delay = 0; + server.cluster_enabled = 0; + server.cluster.configfile = zstrdup("nodes.conf"); updateLRUClock(); resetServerSaveParams(); @@@ -947,6 -966,7 +968,7 @@@ void initServer() } if (server.ds_enabled) dsInit(); + if (server.cluster_enabled) clusterInit(); srand(time(NULL)^getpid()); } @@@ -1053,6 -1073,27 +1075,27 @@@ int processCommand(redisClient *c) return REDIS_OK; } + /* If cluster is enabled, redirect here */ + if (server.cluster_enabled && + !(cmd->getkeys_proc == NULL && cmd->firstkey == 0)) { + int hashslot; + + if (server.cluster.state != REDIS_CLUSTER_OK) { + addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information"); + return REDIS_OK; + } else { + clusterNode *n = getNodeByQuery(c,cmd,c->argv,c->argc,&hashslot); + if (n == NULL) { + addReplyError(c,"Invalid cross-node request"); + return REDIS_OK; + } else if (n != server.cluster.myself) { + addReplySds(c,sdscatprintf(sdsempty(), + "-MOVED %d %s:%d\r\n",hashslot,n->ip,n->port)); + return REDIS_OK; + } + } + } + /* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile diff --combined src/redis.h index 1d6d49dc,26f33451..32dcc359 --- a/src/redis.h +++ b/src/redis.h @@@ -18,6 -18,7 +18,7 @@@ #include #include #include + #include #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ @@@ -70,12 -71,10 +71,12 @@@ #define REDIS_ZSET 3 #define REDIS_HASH 4 #define REDIS_VMPOINTER 8 + /* Object types only used for persistence in .rdb files */ #define REDIS_HASH_ZIPMAP 9 #define REDIS_LIST_ZIPLIST 10 #define REDIS_SET_INTSET 11 +#define REDIS_ZSET_ZIPLIST 12 /* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object @@@ -87,6 -86,7 +88,7 @@@ #define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */ #define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ #define REDIS_ENCODING_INTSET 6 /* Encoded as intset */ + #define REDIS_ENCODING_SKIPLIST 7 /* Encoded as skiplist */ /* Object types only used for dumping to disk */ #define REDIS_EXPIRETIME 253 @@@ -197,8 -197,6 +199,8 @@@ #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 512 #define REDIS_LIST_MAX_ZIPLIST_VALUE 64 #define REDIS_SET_MAX_INTSET_ENTRIES 512 +#define REDIS_ZSET_MAX_ZIPLIST_ENTRIES 128 +#define REDIS_ZSET_MAX_ZIPLIST_VALUE 64 /* Sets operations codes */ #define REDIS_OP_UNION 0 @@@ -364,7 -362,124 +366,124 @@@ struct sharedObjectsStruct *integers[REDIS_SHARED_INTEGERS]; }; - /* Global server state structure */ + /*----------------------------------------------------------------------------- + * Redis cluster data structures + *----------------------------------------------------------------------------*/ + + #define REDIS_CLUSTER_SLOTS 4096 + #define REDIS_CLUSTER_OK 0 /* Everything looks ok */ + #define REDIS_CLUSTER_FAIL 1 /* The cluster can't work */ + #define REDIS_CLUSTER_NEEDHELP 2 /* The cluster works, but needs some help */ + #define REDIS_CLUSTER_NAMELEN 40 /* sha1 hex length */ + #define REDIS_CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */ + + struct clusterNode; + + /* clusterLink encapsulates everything needed to talk with a remote node. */ + typedef struct clusterLink { + int fd; /* TCP socket file descriptor */ + sds sndbuf; /* Packet send buffer */ + sds rcvbuf; /* Packet reception buffer */ + struct clusterNode *node; /* Node related to this link if any, or NULL */ + } clusterLink; + + /* Node flags */ + #define REDIS_NODE_MASTER 1 /* The node is a master */ + #define REDIS_NODE_SLAVE 2 /* The node is a slave */ + #define REDIS_NODE_PFAIL 4 /* Failure? Need acknowledge */ + #define REDIS_NODE_FAIL 8 /* The node is believed to be malfunctioning */ + #define REDIS_NODE_MYSELF 16 /* This node is myself */ + #define REDIS_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */ + #define REDIS_NODE_NOADDR 64 /* We don't know the address of this node */ + #define REDIS_NODE_MEET 128 /* Send a MEET message to this node */ + #define REDIS_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" + + struct clusterNode { + char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ + int flags; /* REDIS_NODE_... */ + unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */ + int numslaves; /* Number of slave nodes, if this is a master */ + struct clusterNode **slaves; /* pointers to slave nodes */ + struct clusterNode *slaveof; /* pointer to the master node */ + time_t ping_sent; /* Unix time we sent latest ping */ + time_t pong_received; /* Unix time we received the pong */ + char *configdigest; /* Configuration digest of this node */ + time_t configdigest_ts; /* Configuration digest timestamp */ + char ip[16]; /* Latest known IP address of this node */ + int port; /* Latest known port of this node */ + clusterLink *link; /* TCP/IP link with this node */ + }; + typedef struct clusterNode clusterNode; + + typedef struct { + char *configfile; + clusterNode *myself; /* This node */ + int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */ + int node_timeout; + dict *nodes; /* Hash table of name -> clusterNode structures */ + clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS]; + clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS]; + clusterNode *slots[REDIS_CLUSTER_SLOTS]; + } clusterState; + + /* Redis cluster messages header */ + + /* Note that the PING, PONG and MEET messages are actually the same exact + * kind of packet. PONG is the reply to ping, in the extact format as a PING, + * while MEET is a special PING that forces the receiver to add the sender + * as a node (if it is not already in the list). */ + #define CLUSTERMSG_TYPE_PING 0 /* Ping */ + #define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */ + #define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */ + #define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */ + + /* Initially we don't know our "name", but we'll find it once we connect + * to the first node, using the getsockname() function. Then we'll use this + * address for all the next messages. */ + typedef struct { + char nodename[REDIS_CLUSTER_NAMELEN]; + uint32_t ping_sent; + uint32_t pong_received; + char ip[16]; /* IP address last time it was seen */ + uint16_t port; /* port last time it was seen */ + uint16_t flags; + uint32_t notused; /* for 64 bit alignment */ + } clusterMsgDataGossip; + + typedef struct { + char nodename[REDIS_CLUSTER_NAMELEN]; + } clusterMsgDataFail; + + union clusterMsgData { + /* PING, MEET and PONG */ + struct { + /* Array of N clusterMsgDataGossip structures */ + clusterMsgDataGossip gossip[1]; + } ping; + /* FAIL */ + struct { + clusterMsgDataFail about; + } fail; + }; + + typedef struct { + uint32_t totlen; /* Total length of this message */ + uint16_t type; /* Message type */ + uint16_t count; /* Only used for some kind of messages. */ + char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */ + unsigned char myslots[REDIS_CLUSTER_SLOTS/8]; + char slaveof[REDIS_CLUSTER_NAMELEN]; + char configdigest[32]; + uint16_t port; /* Sender TCP base port */ + unsigned char state; /* Cluster state from the POV of the sender */ + unsigned char notused[5]; /* Reserved for future use. For alignment. */ + union clusterMsgData data; + } clusterMsg; + + /*----------------------------------------------------------------------------- + * Global server state + *----------------------------------------------------------------------------*/ + struct redisServer { /* General */ pthread_t mainthread; @@@ -377,6 -492,7 +496,7 @@@ char *unixsocket; int ipfd; int sofd; + int cfd; list *clients; list *slaves, *monitors; char neterr[ANET_ERR_LEN]; @@@ -472,8 -588,6 +592,8 @@@ size_t list_max_ziplist_entries; size_t list_max_ziplist_value; size_t set_max_intset_entries; + size_t zset_max_ziplist_entries; + size_t zset_max_ziplist_value; time_t unixtime; /* Unix time sampled every second. */ /* Virtual memory I/O threads stuff */ /* An I/O thread process an element taken from the io_jobs queue and @@@ -505,6 -619,8 +625,8 @@@ /* Misc */ unsigned lruclock:22; /* clock incrementing every minute, for LRU */ unsigned lruclock_padding:10; + int cluster_enabled; + clusterState cluster; }; typedef struct pubsubPattern { @@@ -513,20 -629,19 +635,19 @@@ } pubsubPattern; typedef void redisCommandProc(redisClient *c); - typedef void redisVmPreloadProc(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); + typedef int *redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, int *numkeys, int flags); struct redisCommand { char *name; redisCommandProc *proc; int arity; int flags; - /* Use a function to determine which keys need to be loaded - * in the background prior to executing this command. Takes precedence - * over vm_firstkey and others, ignored when NULL */ - redisVmPreloadProc *vm_preload_proc; + /* Use a function to determine keys arguments in a command line. + * Used both for diskstore preloading and Redis Cluster. */ + redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ - int vm_firstkey; /* The first argument that's a key (0 = no keys) */ - int vm_lastkey; /* THe last argument that's a key */ - int vm_keystep; /* The step between first and last key */ + int firstkey; /* The first argument that's a key (0 = no keys) */ + int lastkey; /* THe last argument that's a key */ + int keystep; /* The step between first and last key */ long long microseconds, calls; }; @@@ -640,6 -755,7 +761,7 @@@ extern struct redisServer server extern struct sharedObjectsStruct shared; extern dictType setDictType; extern dictType zsetDictType; + extern dictType clusterNodesDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; dictType hashDictType; @@@ -736,7 -852,6 +858,7 @@@ robj *createSetObject(void) robj *createIntsetObject(void); robj *createHashObject(void); robj *createZsetObject(void); +robj *createZsetZiplistObject(void); int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg); int checkType(redisClient *c, robj *o, int type); int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, const char *msg); @@@ -755,6 -870,7 +877,7 @@@ int fwriteBulkString(FILE *fp, char *s int fwriteBulkDouble(FILE *fp, double d); int fwriteBulkLongLong(FILE *fp, long long l); int fwriteBulkObject(FILE *fp, robj *obj); + int fwriteBulkCount(FILE *fp, char prefix, int count); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); @@@ -799,12 -915,6 +922,12 @@@ void backgroundRewriteDoneHandler(int e zskiplist *zslCreate(void); void zslFree(zskiplist *zsl); zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj); +unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score); +double zzlGetScore(unsigned char *sptr); +void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); +void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); +unsigned int zsetLength(robj *zobj); +void zsetConvert(robj *zobj, int encoding); /* Core functions */ void freeMemoryIfNeeded(void); @@@ -842,8 -952,6 +965,6 @@@ void freeIOJob(iojob *j) void queueIOJob(iojob *j); void waitEmptyIOJobsQueue(void); void processAllPendingIOJobs(void); - void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); - void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); int dontWaitForSwappedKey(redisClient *c, robj *key); void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); @@@ -900,8 -1008,6 +1021,8 @@@ int stringmatchlen(const char *pattern int stringmatch(const char *pattern, const char *string, int nocase); long long memtoll(const char *p, int *err); int ll2string(char *s, size_t len, long long value); +int string2ll(char *s, size_t len, long long *value); +int d2string(char *s, size_t len, double value); int isStringRepresentableAsLong(sds s, long *longval); int isStringRepresentableAsLongLong(sds s, long long *longval); int isObjectRepresentableAsLongLong(robj *o, long long *llongval); @@@ -932,6 -1038,24 +1053,24 @@@ int selectDb(redisClient *c, int id) void signalModifiedKey(redisDb *db, robj *key); void signalFlushedDb(int dbid); + /* API to get key arguments from commands */ + #define REDIS_GETKEYS_ALL 0 + #define REDIS_GETKEYS_PRELOAD 1 + int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys, int flags); + void getKeysFreeResult(int *result); + int *noPreloadGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags); + int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags); + int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags); + + /* Cluster */ + void clusterInit(void); + unsigned short crc16(const char *buf, int len); + unsigned int keyHashSlot(char *key, int keylen); + clusterNode *createClusterNode(char *nodename, int flags); + int clusterAddNode(clusterNode *node); + void clusterCron(void); + clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot); + /* Git SHA1 */ char *redisGitSHA1(void); char *redisGitDirty(void); @@@ -1054,6 -1178,11 +1193,11 @@@ void punsubscribeCommand(redisClient *c void publishCommand(redisClient *c); void watchCommand(redisClient *c); void unwatchCommand(redisClient *c); + void clusterCommand(redisClient *c); + void restoreCommand(redisClient *c); + void migrateCommand(redisClient *c); + void dumpCommand(redisClient *c); + void objectCommand(redisClient *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --combined src/ziplist.c index 44d63c78,55bb662b..1c492f25 --- a/src/ziplist.c +++ b/src/ziplist.c @@@ -68,6 -68,7 +68,7 @@@ #include #include "zmalloc.h" #include "ziplist.h" + #include "endian.h" int ll2string(char *s, size_t len, long long value); @@@ -207,6 -208,7 +208,7 @@@ static unsigned int zipPrevDecodeLength } else { if (lensize) *lensize = 1+sizeof(len); memcpy(&len,p+1,sizeof(len)); + memrev32ifbe(&len); } return len; } @@@ -223,6 -225,7 +225,7 @@@ static unsigned int zipPrevEncodeLength } else { p[0] = ZIP_BIGLEN; memcpy(p+1,&len,sizeof(len)); + memrev32ifbe(p+1); return 1+sizeof(len); } } @@@ -234,6 -237,7 +237,7 @@@ static void zipPrevEncodeLengthForceLar if (p == NULL) return; p[0] = ZIP_BIGLEN; memcpy(p+1,&len,sizeof(len)); + memrev32ifbe(p+1); } /* Return the difference in number of bytes needed to store the new length @@@ -287,12 -291,15 +291,15 @@@ static void zipSaveInteger(unsigned cha if (encoding == ZIP_INT_16B) { i16 = value; memcpy(p,&i16,sizeof(i16)); + memrev16ifbe(p); } else if (encoding == ZIP_INT_32B) { i32 = value; memcpy(p,&i32,sizeof(i32)); + memrev32ifbe(p); } else if (encoding == ZIP_INT_64B) { i64 = value; memcpy(p,&i64,sizeof(i64)); + memrev64ifbe(p); } else { assert(NULL); } @@@ -305,12 -312,15 +312,15 @@@ static int64_t zipLoadInteger(unsigned int64_t i64, ret = 0; if (encoding == ZIP_INT_16B) { memcpy(&i16,p,sizeof(i16)); + memrev16ifbe(&i16); ret = i16; } else if (encoding == ZIP_INT_32B) { memcpy(&i32,p,sizeof(i32)); + memrev16ifbe(&i32); ret = i32; } else if (encoding == ZIP_INT_64B) { memcpy(&i64,p,sizeof(i64)); + memrev16ifbe(&i64); ret = i64; } else { assert(NULL); @@@ -375,8 -385,8 +385,8 @@@ static unsigned char *ziplistResize(uns * The pointer "p" points to the first entry that does NOT need to be * updated, i.e. consecutive fields MAY need an update. */ static unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) { - unsigned int curlen = ZIPLIST_BYTES(zl), rawlen, rawlensize; - unsigned int offset, noffset, extra; + size_t curlen = ZIPLIST_BYTES(zl), rawlen, rawlensize; + size_t offset, noffset, extra; unsigned char *np; zlentry cur, next; @@@ -431,8 -441,7 +441,8 @@@ /* Delete "num" entries, starting at "p". Returns pointer to the ziplist. */ static unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) { unsigned int i, totlen, deleted = 0; - int offset, nextdiff = 0; + size_t offset; + int nextdiff = 0; zlentry first, tail; first = zipEntry(p); @@@ -484,9 -493,8 +494,9 @@@ /* Insert item at "p". */ static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) { - unsigned int curlen = ZIPLIST_BYTES(zl), reqlen, prevlen = 0; - unsigned int offset, nextdiff = 0; + size_t curlen = ZIPLIST_BYTES(zl), reqlen, prevlen = 0; + size_t offset; + int nextdiff = 0; unsigned char encoding = 0; long long value; zlentry entry, tail; @@@ -670,7 -678,7 +680,7 @@@ unsigned char *ziplistInsert(unsigned c * Also update *p in place, to be able to iterate over the * ziplist, while deleting entries. */ unsigned char *ziplistDelete(unsigned char *zl, unsigned char **p) { - unsigned int offset = *p-zl; + size_t offset = *p-zl; zl = __ziplistDelete(zl,*p,1); /* Store pointer to current element in p, because ziplistDelete will