server.list_max_ziplist_value = memtoll(argv[1], NULL);
} else if (!strcasecmp(argv[0],"set-max-intset-entries") && argc == 2) {
server.set_max_intset_entries = memtoll(argv[1], NULL);
+ } else if (!strcasecmp(argv[0],"zset-max-ziplist-entries") && argc == 2) {
+ server.zset_max_ziplist_entries = memtoll(argv[1], NULL);
+ } else if (!strcasecmp(argv[0],"zset-max-ziplist-value") && argc == 2) {
+ server.zset_max_ziplist_value = memtoll(argv[1], NULL);
} else if (!strcasecmp(argv[0],"rename-command") && argc == 3) {
struct redisCommand *cmd = lookupCommand(argv[1]);
int retval;
err = "Target command name already exists"; goto loaderr;
}
}
+ } else if (!strcasecmp(argv[0],"cluster-enabled") && argc == 2) {
+ if ((server.cluster_enabled = yesnotoi(argv[1])) == -1) {
+ err = "argument must be 'yes' or 'no'"; goto loaderr;
+ }
} else {
err = "Bad directive or wrong number of arguments"; goto loaderr;
}
} else if (!strcasecmp(c->argv[2]->ptr,"set-max-intset-entries")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
server.set_max_intset_entries = ll;
+ } else if (!strcasecmp(c->argv[2]->ptr,"zset-max-ziplist-entries")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
+ server.zset_max_ziplist_entries = ll;
+ } else if (!strcasecmp(c->argv[2]->ptr,"zset-max-ziplist-value")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt;
+ server.zset_max_ziplist_value = ll;
} else {
addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
(char*)c->argv[2]->ptr);
addReplyBulkLongLong(c,server.set_max_intset_entries);
matches++;
}
+ if (stringmatch(pattern,"zset-max-ziplist-entries",0)) {
+ addReplyBulkCString(c,"zset-max-ziplist-entries");
+ addReplyBulkLongLong(c,server.zset_max_ziplist_entries);
+ matches++;
+ }
+ if (stringmatch(pattern,"zset-max-ziplist-value",0)) {
+ addReplyBulkCString(c,"zset-max-ziplist-value");
+ addReplyBulkLongLong(c,server.zset_max_ziplist_value);
+ matches++;
+ }
setDeferredMultiBulkLength(c,replylen,matches*2);
}
robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
+ robj *o;
+
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
- return createObject(REDIS_ZSET,zs);
+ o = createObject(REDIS_ZSET,zs);
+ o->encoding = REDIS_ENCODING_SKIPLIST;
+ return o;
}
+robj *createZsetZiplistObject(void) {
+ unsigned char *zl = ziplistNew();
+ robj *o = createObject(REDIS_ZSET,zl);
+ o->encoding = REDIS_ENCODING_ZIPLIST;
+ return o;
+}
+
void freeStringObject(robj *o) {
if (o->encoding == REDIS_ENCODING_RAW) {
sdsfree(o->ptr);
}
void freeZsetObject(robj *o) {
- zset *zs = o->ptr;
-
- dictRelease(zs->dict);
- zslFree(zs->zsl);
- zfree(zs);
+ zset *zs;
+ switch (o->encoding) {
+ case REDIS_ENCODING_RAW:
+ zs = o->ptr;
+ dictRelease(zs->dict);
+ zslFree(zs->zsl);
+ zfree(zs);
+ break;
+ case REDIS_ENCODING_ZIPLIST:
+ zfree(o->ptr);
+ break;
+ default:
+ redisPanic("Unknown sorted set encoding");
+ }
}
void freeHashObject(robj *o) {
case REDIS_ENCODING_LINKEDLIST: return "linkedlist";
case REDIS_ENCODING_ZIPLIST: return "ziplist";
case REDIS_ENCODING_INTSET: return "intset";
+ case REDIS_ENCODING_SKIPLIST: return "skiplist";
default: return "unknown";
}
}
REDIS_LRU_CLOCK_RESOLUTION;
}
}
+
+ /* This is an helper function for the DEBUG command. We need to lookup keys
+ * without any modification of LRU or other parameters. */
+ robj *objectCommandLookup(redisClient *c, robj *key) {
+ dictEntry *de;
+
+ if ((de = dictFind(c->db->dict,key->ptr)) == NULL) return NULL;
+ return (robj*) dictGetEntryVal(de);
+ }
+
+ robj *objectCommandLookupOrReply(redisClient *c, robj *key, robj *reply) {
+ robj *o = objectCommandLookup(c,key);
+
+ if (!o) addReply(c, reply);
+ return o;
+ }
+
+ /* Object command allows to inspect the internals of an Redis Object.
+ * Usage: OBJECT <verb> ... arguments ... */
+ void objectCommand(redisClient *c) {
+ robj *o;
+
+ if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
+ if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
+ == NULL) return;
+ addReplyLongLong(c,o->refcount);
+ } else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
+ if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
+ == NULL) return;
+ addReplyBulkCString(c,strEncoding(o->encoding));
+ } else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
+ if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
+ == NULL) return;
+ addReplyLongLong(c,estimateObjectIdleTime(o));
+ } else {
+ addReplyError(c,"Syntax error. Try OBJECT (refcount|encoding|idletime)");
+ }
+ }
+
return rdbWriteRaw(fp,buf,len);
}
- /* Save a Redis object. */
+ /* Save a Redis object. Returns -1 on error, 0 on success. */
int rdbSaveObject(FILE *fp, robj *o) {
int n, nwritten = 0;
redisPanic("Unknown set encoding");
}
} else if (o->type == REDIS_ZSET) {
- /* Save a set value */
- zset *zs = o->ptr;
- dictIterator *di = dictGetIterator(zs->dict);
- dictEntry *de;
-
- if ((n = rdbSaveLen(fp,dictSize(zs->dict))) == -1) return -1;
- nwritten += n;
-
- while((de = dictNext(di)) != NULL) {
- robj *eleobj = dictGetEntryKey(de);
- double *score = dictGetEntryVal(de);
+ /* Save a sorted set value */
+ if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+ size_t l = ziplistBlobLen((unsigned char*)o->ptr);
- if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
+ if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
nwritten += n;
- if ((n = rdbSaveDoubleValue(fp,*score)) == -1) return -1;
+ } else if (o->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = o->ptr;
+ dictIterator *di = dictGetIterator(zs->dict);
+ dictEntry *de;
+
+ if ((n = rdbSaveLen(fp,dictSize(zs->dict))) == -1) return -1;
nwritten += n;
+
+ while((de = dictNext(di)) != NULL) {
+ robj *eleobj = dictGetEntryKey(de);
+ double *score = dictGetEntryVal(de);
+
+ if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
+ nwritten += n;
+ if ((n = rdbSaveDoubleValue(fp,*score)) == -1) return -1;
+ nwritten += n;
+ }
+ dictReleaseIterator(di);
+ } else {
+ redisPanic("Unknown sorted set enoding");
}
- dictReleaseIterator(di);
} else if (o->type == REDIS_HASH) {
/* Save a hash value */
if (o->encoding == REDIS_ENCODING_ZIPMAP) {
vtype = REDIS_LIST_ZIPLIST;
else if (vtype == REDIS_SET && val->encoding == REDIS_ENCODING_INTSET)
vtype = REDIS_SET_INTSET;
+ else if (vtype == REDIS_ZSET && val->encoding == REDIS_ENCODING_ZIPLIST)
+ vtype = REDIS_ZSET_ZIPLIST;
/* Save type, key, value */
if (rdbSaveType(fp,vtype) == -1) return -1;
if (rdbSaveStringObject(fp,key) == -1) return -1;
} else if (type == REDIS_ZSET) {
/* Read list/set value */
size_t zsetlen;
+ size_t maxelelen = 0;
zset *zs;
if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
o = createZsetObject();
zs = o->ptr;
+
/* Load every single element of the list/set */
while(zsetlen--) {
robj *ele;
if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
ele = tryObjectEncoding(ele);
if (rdbLoadDoubleValue(fp,&score) == -1) return NULL;
+
+ /* Don't care about integer-encoded strings. */
+ if (ele->encoding == REDIS_ENCODING_RAW &&
+ sdslen(ele->ptr) > maxelelen)
+ maxelelen = sdslen(ele->ptr);
+
znode = zslInsert(zs->zsl,score,ele);
dictAdd(zs->dict,ele,&znode->score);
incrRefCount(ele); /* added to skiplist */
}
+
+ /* Convert *after* loading, since sorted sets are not stored ordered. */
+ if (zsetLength(o) <= server.zset_max_ziplist_entries &&
+ maxelelen <= server.zset_max_ziplist_value)
+ zsetConvert(o,REDIS_ENCODING_ZIPLIST);
} else if (type == REDIS_HASH) {
size_t hashlen;
}
} else if (type == REDIS_HASH_ZIPMAP ||
type == REDIS_LIST_ZIPLIST ||
- type == REDIS_SET_INTSET)
+ type == REDIS_SET_INTSET ||
+ type == REDIS_ZSET_ZIPLIST)
{
robj *aux = rdbLoadStringObject(fp);
if (intsetLen(o->ptr) > server.set_max_intset_entries)
setTypeConvert(o,REDIS_ENCODING_HT);
break;
+ case REDIS_ZSET_ZIPLIST:
+ o->type = REDIS_ZSET;
+ o->encoding = REDIS_ENCODING_ZIPLIST;
+ if (zsetLength(o) > server.zset_max_ziplist_entries)
+ zsetConvert(o,REDIS_ENCODING_RAW);
+ break;
default:
redisPanic("Unknown enoding");
break;
struct redisCommand *commandTable;
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,0,NULL,1,1,1,0,0},
- {"set",setCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0},
- {"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0},
- {"setex",setexCommand,4,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0},
+ {"set",setCommand,3,REDIS_CMD_DENYOOM,noPreloadGetKeys,1,1,1,0,0},
+ {"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,noPreloadGetKeys,1,1,1,0,0},
+ {"setex",setexCommand,4,REDIS_CMD_DENYOOM,noPreloadGetKeys,2,2,1,0,0},
{"append",appendCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
{"strlen",strlenCommand,2,0,NULL,1,1,1,0,0},
- {"del",delCommand,-2,0,NULL,0,0,0,0,0},
+ {"del",delCommand,-2,0,noPreloadGetKeys,1,-1,1,0,0},
{"exists",existsCommand,2,0,NULL,1,1,1,0,0},
{"setbit",setbitCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
{"getbit",getbitCommand,3,0,NULL,1,1,1,0,0},
{"lpop",lpopCommand,2,0,NULL,1,1,1,0,0},
{"brpop",brpopCommand,-3,0,NULL,1,1,1,0,0},
{"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1,0,0},
- {"blpop",blpopCommand,-3,0,NULL,1,1,1,0,0},
+ {"blpop",blpopCommand,-3,0,NULL,1,-2,1,0,0},
{"llen",llenCommand,2,0,NULL,1,1,1,0,0},
{"lindex",lindexCommand,3,0,NULL,1,1,1,0,0},
{"lset",lsetCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
{"zrem",zremCommand,3,0,NULL,1,1,1,0,0},
{"zremrangebyscore",zremrangebyscoreCommand,4,0,NULL,1,1,1,0,0},
{"zremrangebyrank",zremrangebyrankCommand,4,0,NULL,1,1,1,0,0},
- {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0,0,0},
- {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0,0,0},
+ {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterGetKeys,0,0,0,0,0},
+ {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterGetKeys,0,0,0,0,0},
{"zrange",zrangeCommand,-4,0,NULL,1,1,1,0,0},
{"zrangebyscore",zrangebyscoreCommand,-4,0,NULL,1,1,1,0,0},
{"zrevrangebyscore",zrevrangebyscoreCommand,-4,0,NULL,1,1,1,0,0},
{"randomkey",randomkeyCommand,1,0,NULL,0,0,0,0,0},
{"select",selectCommand,2,0,NULL,0,0,0,0,0},
{"move",moveCommand,3,0,NULL,1,1,1,0,0},
- {"rename",renameCommand,3,0,NULL,1,1,1,0,0},
- {"renamenx",renamenxCommand,3,0,NULL,1,1,1,0,0},
- {"expire",expireCommand,3,0,NULL,0,0,0,0,0},
- {"expireat",expireatCommand,3,0,NULL,0,0,0,0,0},
+ {"rename",renameCommand,3,0,renameGetKeys,1,2,1,0,0},
+ {"renamenx",renamenxCommand,3,0,renameGetKeys,1,2,1,0,0},
+ {"expire",expireCommand,3,0,NULL,1,1,1,0,0},
+ {"expireat",expireatCommand,3,0,NULL,1,1,1,0,0},
{"keys",keysCommand,2,0,NULL,0,0,0,0,0},
{"dbsize",dbsizeCommand,1,0,NULL,0,0,0,0,0},
{"auth",authCommand,2,0,NULL,0,0,0,0,0},
{"lastsave",lastsaveCommand,1,0,NULL,0,0,0,0,0},
{"type",typeCommand,2,0,NULL,1,1,1,0,0},
{"multi",multiCommand,1,0,NULL,0,0,0,0,0},
- {"exec",execCommand,1,REDIS_CMD_DENYOOM,execBlockClientOnSwappedKeys,0,0,0,0,0},
+ {"exec",execCommand,1,REDIS_CMD_DENYOOM,NULL,0,0,0,0,0},
{"discard",discardCommand,1,0,NULL,0,0,0,0,0},
{"sync",syncCommand,1,0,NULL,0,0,0,0,0},
{"flushdb",flushdbCommand,1,0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,0,NULL,0,0,0,0,0},
{"publish",publishCommand,3,REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0,0,0},
- {"watch",watchCommand,-2,0,NULL,0,0,0,0,0},
- {"unwatch",unwatchCommand,1,0,NULL,0,0,0,0,0}
+ {"watch",watchCommand,-2,0,noPreloadGetKeys,1,-1,1,0,0},
+ {"unwatch",unwatchCommand,1,0,NULL,0,0,0,0,0},
+ {"cluster",clusterCommand,-2,0,NULL,0,0,0,0,0},
+ {"restore",restoreCommand,4,0,NULL,0,0,0,0,0},
+ {"migrate",migrateCommand,6,0,NULL,0,0,0,0,0},
+ {"dump",dumpCommand,2,0,NULL,0,0,0,0,0},
+ {"object",objectCommand,-2,0,NULL,0,0,0,0,0}
};
/*============================ Utility functions ============================ */
dictListDestructor /* val destructor */
};
+ /* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
+ * clusterNode structures. */
+ dictType clusterNodesDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ dictSdsDestructor, /* key destructor */
+ NULL /* val destructor */
+ };
+
int htNeedsResize(dict *dict) {
long long size, used;
* to detect transfer failures. */
if (!(loops % 10)) replicationCron();
+ /* Run other sub-systems specific cron jobs */
+ if (server.cluster_enabled && !(loops % 10)) clusterCron();
+
server.cronloops++;
return 100;
}
server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
+ server.zset_max_ziplist_entries = REDIS_ZSET_MAX_ZIPLIST_ENTRIES;
+ server.zset_max_ziplist_value = REDIS_ZSET_MAX_ZIPLIST_VALUE;
server.shutdown_asap = 0;
server.cache_flush_delay = 0;
+ server.cluster_enabled = 0;
+ server.cluster.configfile = zstrdup("nodes.conf");
updateLRUClock();
resetServerSaveParams();
}
if (server.ds_enabled) dsInit();
+ if (server.cluster_enabled) clusterInit();
srand(time(NULL)^getpid());
}
return REDIS_OK;
}
+ /* If cluster is enabled, redirect here */
+ if (server.cluster_enabled &&
+ !(cmd->getkeys_proc == NULL && cmd->firstkey == 0)) {
+ int hashslot;
+
+ if (server.cluster.state != REDIS_CLUSTER_OK) {
+ addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information");
+ return REDIS_OK;
+ } else {
+ clusterNode *n = getNodeByQuery(c,cmd,c->argv,c->argc,&hashslot);
+ if (n == NULL) {
+ addReplyError(c,"Invalid cross-node request");
+ return REDIS_OK;
+ } else if (n != server.cluster.myself) {
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "-MOVED %d %s:%d\r\n",hashslot,n->ip,n->port));
+ return REDIS_OK;
+ }
+ }
+ }
+
/* Handle the maxmemory directive.
*
* First we try to free some memory if possible (if there are volatile
#include <inttypes.h>
#include <pthread.h>
#include <syslog.h>
+ #include <netinet/in.h>
#include "ae.h" /* Event driven programming library */
#include "sds.h" /* Dynamic safe strings */
#define REDIS_ZSET 3
#define REDIS_HASH 4
#define REDIS_VMPOINTER 8
+
/* Object types only used for persistence in .rdb files */
#define REDIS_HASH_ZIPMAP 9
#define REDIS_LIST_ZIPLIST 10
#define REDIS_SET_INTSET 11
+#define REDIS_ZSET_ZIPLIST 12
/* Objects encoding. Some kind of objects like Strings and Hashes can be
* internally represented in multiple ways. The 'encoding' field of the object
#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define REDIS_ENCODING_INTSET 6 /* Encoded as intset */
+ #define REDIS_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
/* Object types only used for dumping to disk */
#define REDIS_EXPIRETIME 253
#define REDIS_LIST_MAX_ZIPLIST_ENTRIES 512
#define REDIS_LIST_MAX_ZIPLIST_VALUE 64
#define REDIS_SET_MAX_INTSET_ENTRIES 512
+#define REDIS_ZSET_MAX_ZIPLIST_ENTRIES 128
+#define REDIS_ZSET_MAX_ZIPLIST_VALUE 64
/* Sets operations codes */
#define REDIS_OP_UNION 0
*integers[REDIS_SHARED_INTEGERS];
};
- /* Global server state structure */
+ /*-----------------------------------------------------------------------------
+ * Redis cluster data structures
+ *----------------------------------------------------------------------------*/
+
+ #define REDIS_CLUSTER_SLOTS 4096
+ #define REDIS_CLUSTER_OK 0 /* Everything looks ok */
+ #define REDIS_CLUSTER_FAIL 1 /* The cluster can't work */
+ #define REDIS_CLUSTER_NEEDHELP 2 /* The cluster works, but needs some help */
+ #define REDIS_CLUSTER_NAMELEN 40 /* sha1 hex length */
+ #define REDIS_CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */
+
+ struct clusterNode;
+
+ /* clusterLink encapsulates everything needed to talk with a remote node. */
+ typedef struct clusterLink {
+ int fd; /* TCP socket file descriptor */
+ sds sndbuf; /* Packet send buffer */
+ sds rcvbuf; /* Packet reception buffer */
+ struct clusterNode *node; /* Node related to this link if any, or NULL */
+ } clusterLink;
+
+ /* Node flags */
+ #define REDIS_NODE_MASTER 1 /* The node is a master */
+ #define REDIS_NODE_SLAVE 2 /* The node is a slave */
+ #define REDIS_NODE_PFAIL 4 /* Failure? Need acknowledge */
+ #define REDIS_NODE_FAIL 8 /* The node is believed to be malfunctioning */
+ #define REDIS_NODE_MYSELF 16 /* This node is myself */
+ #define REDIS_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */
+ #define REDIS_NODE_NOADDR 64 /* We don't know the address of this node */
+ #define REDIS_NODE_MEET 128 /* Send a MEET message to this node */
+ #define REDIS_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
+
+ struct clusterNode {
+ char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
+ int flags; /* REDIS_NODE_... */
+ unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */
+ int numslaves; /* Number of slave nodes, if this is a master */
+ struct clusterNode **slaves; /* pointers to slave nodes */
+ struct clusterNode *slaveof; /* pointer to the master node */
+ time_t ping_sent; /* Unix time we sent latest ping */
+ time_t pong_received; /* Unix time we received the pong */
+ char *configdigest; /* Configuration digest of this node */
+ time_t configdigest_ts; /* Configuration digest timestamp */
+ char ip[16]; /* Latest known IP address of this node */
+ int port; /* Latest known port of this node */
+ clusterLink *link; /* TCP/IP link with this node */
+ };
+ typedef struct clusterNode clusterNode;
+
+ typedef struct {
+ char *configfile;
+ clusterNode *myself; /* This node */
+ int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */
+ int node_timeout;
+ dict *nodes; /* Hash table of name -> clusterNode structures */
+ clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];
+ clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];
+ clusterNode *slots[REDIS_CLUSTER_SLOTS];
+ } clusterState;
+
+ /* Redis cluster messages header */
+
+ /* Note that the PING, PONG and MEET messages are actually the same exact
+ * kind of packet. PONG is the reply to ping, in the extact format as a PING,
+ * while MEET is a special PING that forces the receiver to add the sender
+ * as a node (if it is not already in the list). */
+ #define CLUSTERMSG_TYPE_PING 0 /* Ping */
+ #define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
+ #define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
+ #define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
+
+ /* Initially we don't know our "name", but we'll find it once we connect
+ * to the first node, using the getsockname() function. Then we'll use this
+ * address for all the next messages. */
+ typedef struct {
+ char nodename[REDIS_CLUSTER_NAMELEN];
+ uint32_t ping_sent;
+ uint32_t pong_received;
+ char ip[16]; /* IP address last time it was seen */
+ uint16_t port; /* port last time it was seen */
+ uint16_t flags;
+ uint32_t notused; /* for 64 bit alignment */
+ } clusterMsgDataGossip;
+
+ typedef struct {
+ char nodename[REDIS_CLUSTER_NAMELEN];
+ } clusterMsgDataFail;
+
+ union clusterMsgData {
+ /* PING, MEET and PONG */
+ struct {
+ /* Array of N clusterMsgDataGossip structures */
+ clusterMsgDataGossip gossip[1];
+ } ping;
+ /* FAIL */
+ struct {
+ clusterMsgDataFail about;
+ } fail;
+ };
+
+ typedef struct {
+ uint32_t totlen; /* Total length of this message */
+ uint16_t type; /* Message type */
+ uint16_t count; /* Only used for some kind of messages. */
+ char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */
+ unsigned char myslots[REDIS_CLUSTER_SLOTS/8];
+ char slaveof[REDIS_CLUSTER_NAMELEN];
+ char configdigest[32];
+ uint16_t port; /* Sender TCP base port */
+ unsigned char state; /* Cluster state from the POV of the sender */
+ unsigned char notused[5]; /* Reserved for future use. For alignment. */
+ union clusterMsgData data;
+ } clusterMsg;
+
+ /*-----------------------------------------------------------------------------
+ * Global server state
+ *----------------------------------------------------------------------------*/
+
struct redisServer {
/* General */
pthread_t mainthread;
char *unixsocket;
int ipfd;
int sofd;
+ int cfd;
list *clients;
list *slaves, *monitors;
char neterr[ANET_ERR_LEN];
size_t list_max_ziplist_entries;
size_t list_max_ziplist_value;
size_t set_max_intset_entries;
+ size_t zset_max_ziplist_entries;
+ size_t zset_max_ziplist_value;
time_t unixtime; /* Unix time sampled every second. */
/* Virtual memory I/O threads stuff */
/* An I/O thread process an element taken from the io_jobs queue and
/* Misc */
unsigned lruclock:22; /* clock incrementing every minute, for LRU */
unsigned lruclock_padding:10;
+ int cluster_enabled;
+ clusterState cluster;
};
typedef struct pubsubPattern {
} pubsubPattern;
typedef void redisCommandProc(redisClient *c);
- typedef void redisVmPreloadProc(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
+ typedef int *redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, int *numkeys, int flags);
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
int flags;
- /* Use a function to determine which keys need to be loaded
- * in the background prior to executing this command. Takes precedence
- * over vm_firstkey and others, ignored when NULL */
- redisVmPreloadProc *vm_preload_proc;
+ /* Use a function to determine keys arguments in a command line.
+ * Used both for diskstore preloading and Redis Cluster. */
+ redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
- int vm_firstkey; /* The first argument that's a key (0 = no keys) */
- int vm_lastkey; /* THe last argument that's a key */
- int vm_keystep; /* The step between first and last key */
+ int firstkey; /* The first argument that's a key (0 = no keys) */
+ int lastkey; /* THe last argument that's a key */
+ int keystep; /* The step between first and last key */
long long microseconds, calls;
};
extern struct sharedObjectsStruct shared;
extern dictType setDictType;
extern dictType zsetDictType;
+ extern dictType clusterNodesDictType;
extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
dictType hashDictType;
robj *createIntsetObject(void);
robj *createHashObject(void);
robj *createZsetObject(void);
+robj *createZsetZiplistObject(void);
int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg);
int checkType(redisClient *c, robj *o, int type);
int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, const char *msg);
int fwriteBulkDouble(FILE *fp, double d);
int fwriteBulkLongLong(FILE *fp, long long l);
int fwriteBulkObject(FILE *fp, robj *obj);
+ int fwriteBulkCount(FILE *fp, char prefix, int count);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
zskiplist *zslCreate(void);
void zslFree(zskiplist *zsl);
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
+unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score);
+double zzlGetScore(unsigned char *sptr);
+void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
+void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
+unsigned int zsetLength(robj *zobj);
+void zsetConvert(robj *zobj, int encoding);
/* Core functions */
void freeMemoryIfNeeded(void);
void queueIOJob(iojob *j);
void waitEmptyIOJobsQueue(void);
void processAllPendingIOJobs(void);
- void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
- void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
int dontWaitForSwappedKey(redisClient *c, robj *key);
void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
int stringmatch(const char *pattern, const char *string, int nocase);
long long memtoll(const char *p, int *err);
int ll2string(char *s, size_t len, long long value);
+int string2ll(char *s, size_t len, long long *value);
+int d2string(char *s, size_t len, double value);
int isStringRepresentableAsLong(sds s, long *longval);
int isStringRepresentableAsLongLong(sds s, long long *longval);
int isObjectRepresentableAsLongLong(robj *o, long long *llongval);
void signalModifiedKey(redisDb *db, robj *key);
void signalFlushedDb(int dbid);
+ /* API to get key arguments from commands */
+ #define REDIS_GETKEYS_ALL 0
+ #define REDIS_GETKEYS_PRELOAD 1
+ int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys, int flags);
+ void getKeysFreeResult(int *result);
+ int *noPreloadGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
+ int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
+ int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
+
+ /* Cluster */
+ void clusterInit(void);
+ unsigned short crc16(const char *buf, int len);
+ unsigned int keyHashSlot(char *key, int keylen);
+ clusterNode *createClusterNode(char *nodename, int flags);
+ int clusterAddNode(clusterNode *node);
+ void clusterCron(void);
+ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot);
+
/* Git SHA1 */
char *redisGitSHA1(void);
char *redisGitDirty(void);
void publishCommand(redisClient *c);
void watchCommand(redisClient *c);
void unwatchCommand(redisClient *c);
+ void clusterCommand(redisClient *c);
+ void restoreCommand(redisClient *c);
+ void migrateCommand(redisClient *c);
+ void dumpCommand(redisClient *c);
+ void objectCommand(redisClient *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
#include <limits.h>
#include "zmalloc.h"
#include "ziplist.h"
+ #include "endian.h"
int ll2string(char *s, size_t len, long long value);
} else {
if (lensize) *lensize = 1+sizeof(len);
memcpy(&len,p+1,sizeof(len));
+ memrev32ifbe(&len);
}
return len;
}
} else {
p[0] = ZIP_BIGLEN;
memcpy(p+1,&len,sizeof(len));
+ memrev32ifbe(p+1);
return 1+sizeof(len);
}
}
if (p == NULL) return;
p[0] = ZIP_BIGLEN;
memcpy(p+1,&len,sizeof(len));
+ memrev32ifbe(p+1);
}
/* Return the difference in number of bytes needed to store the new length
if (encoding == ZIP_INT_16B) {
i16 = value;
memcpy(p,&i16,sizeof(i16));
+ memrev16ifbe(p);
} else if (encoding == ZIP_INT_32B) {
i32 = value;
memcpy(p,&i32,sizeof(i32));
+ memrev32ifbe(p);
} else if (encoding == ZIP_INT_64B) {
i64 = value;
memcpy(p,&i64,sizeof(i64));
+ memrev64ifbe(p);
} else {
assert(NULL);
}
int64_t i64, ret = 0;
if (encoding == ZIP_INT_16B) {
memcpy(&i16,p,sizeof(i16));
+ memrev16ifbe(&i16);
ret = i16;
} else if (encoding == ZIP_INT_32B) {
memcpy(&i32,p,sizeof(i32));
+ memrev16ifbe(&i32);
ret = i32;
} else if (encoding == ZIP_INT_64B) {
memcpy(&i64,p,sizeof(i64));
+ memrev16ifbe(&i64);
ret = i64;
} else {
assert(NULL);
* The pointer "p" points to the first entry that does NOT need to be
* updated, i.e. consecutive fields MAY need an update. */
static unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
- unsigned int curlen = ZIPLIST_BYTES(zl), rawlen, rawlensize;
- unsigned int offset, noffset, extra;
+ size_t curlen = ZIPLIST_BYTES(zl), rawlen, rawlensize;
+ size_t offset, noffset, extra;
unsigned char *np;
zlentry cur, next;
/* Delete "num" entries, starting at "p". Returns pointer to the ziplist. */
static unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
unsigned int i, totlen, deleted = 0;
- int offset, nextdiff = 0;
+ size_t offset;
+ int nextdiff = 0;
zlentry first, tail;
first = zipEntry(p);
/* Insert item at "p". */
static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
- unsigned int curlen = ZIPLIST_BYTES(zl), reqlen, prevlen = 0;
- unsigned int offset, nextdiff = 0;
+ size_t curlen = ZIPLIST_BYTES(zl), reqlen, prevlen = 0;
+ size_t offset;
+ int nextdiff = 0;
unsigned char encoding = 0;
long long value;
zlentry entry, tail;
* Also update *p in place, to be able to iterate over the
* ziplist, while deleting entries. */
unsigned char *ziplistDelete(unsigned char *zl, unsigned char **p) {
- unsigned int offset = *p-zl;
+ size_t offset = *p-zl;
zl = __ziplistDelete(zl,*p,1);
/* Store pointer to current element in p, because ziplistDelete will