#include "dict.h" /* Hash tables */
#include "adlist.h" /* Linked lists */
#include "zmalloc.h" /* total memory usage aware version of malloc/free */
-#include "lzf.h"
+#include "lzf.h" /* LZF compression library */
+#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
/* Error codes */
#define REDIS_OK 0
#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
#define REDIS_IOBUF_LEN 1024
#define REDIS_LOADBUF_LEN 1024
-#define REDIS_MAX_ARGS 16
+#define REDIS_STATIC_ARGS 4
#define REDIS_DEFAULT_DBNUM 16
#define REDIS_CONFIGLINE_MAX 1024
#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
redisDb *db;
int dictid;
sds querybuf;
- robj *argv[REDIS_MAX_ARGS];
+ robj **argv;
int argc;
int bulklen; /* bulk read len. -1 if not in bulk read mode */
list *reply;
static void sinterstoreCommand(redisClient *c);
static void sunionCommand(redisClient *c);
static void sunionstoreCommand(redisClient *c);
+static void sdiffCommand(redisClient *c);
+static void sdiffstoreCommand(redisClient *c);
static void syncCommand(redisClient *c);
static void flushdbCommand(redisClient *c);
static void flushallCommand(redisClient *c);
{"get",getCommand,2,REDIS_CMD_INLINE},
{"set",setCommand,3,REDIS_CMD_BULK},
{"setnx",setnxCommand,3,REDIS_CMD_BULK},
- {"del",delCommand,2,REDIS_CMD_INLINE},
+ {"del",delCommand,-2,REDIS_CMD_INLINE},
{"exists",existsCommand,2,REDIS_CMD_INLINE},
{"incr",incrCommand,2,REDIS_CMD_INLINE},
{"decr",decrCommand,2,REDIS_CMD_INLINE},
{"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE},
{"sunion",sunionCommand,-2,REDIS_CMD_INLINE},
{"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE},
+ {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE},
+ {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE},
{"smembers",sinterCommand,2,REDIS_CMD_INLINE},
{"incrby",incrbyCommand,3,REDIS_CMD_INLINE},
{"decrby",decrbyCommand,3,REDIS_CMD_INLINE},
va_start(ap, fmt);
if (level >= server.verbosity) {
char *c = ".-*";
- fprintf(fp,"%c ",c[level]);
+ char buf[64];
+ time_t now;
+
+ now = time(NULL);
+ strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
+ fprintf(fp,"%s %c ",buf,c[level]);
vfprintf(fp, fmt, ap);
fprintf(fp,"\n");
fflush(fp);
}
}
+/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
+ * we resize the hash table to save memory */
+void tryResizeHashTables(void) {
+ int j;
+
+ for (j = 0; j < server.dbnum; j++) {
+ long long size, used;
+
+ size = dictSlots(server.db[j].dict);
+ used = dictSize(server.db[j].dict);
+ if (size && used && size > REDIS_HT_MINSLOTS &&
+ (used*100/size < REDIS_HT_MINFILL)) {
+ redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
+ dictResize(server.db[j].dict);
+ redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
+ }
+ }
+}
+
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j, loops = server.cronloops++;
REDIS_NOTUSED(eventLoop);
/* Update the global state with the amount of used memory */
server.usedmemory = zmalloc_used_memory();
- /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
- * we resize the hash table to save memory */
+ /* Show some info about non-empty databases */
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
- if (size && used && size > REDIS_HT_MINSLOTS &&
- (used*100/size < REDIS_HT_MINFILL)) {
- redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
- dictResize(server.db[j].dict);
- redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
- }
}
+ /* We don't want to resize the hash tables while a bacground saving
+ * is in progress: the saving child is created using fork() that is
+ * implemented with a copy-on-write semantic in most modern systems, so
+ * if we resize the HT while there is the saving child at work actually
+ * a lot of memory movements in the parent will cause a lot of pages
+ * copied. */
+ if (!server.bgsaveinprogress) tryResizeHashTables();
+
/* Show information about connected clients */
if (!(loops % 5)) {
redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
}
/* Close connections of timedout clients */
- if (!(loops % 10))
+ if (server.maxidletime && !(loops % 10))
closeTimedoutClients();
/* Check if a background saving in progress terminated */
/* Execute config directives */
if (!strcasecmp(argv[0],"timeout") && argc == 2) {
server.maxidletime = atoi(argv[1]);
- if (server.maxidletime < 1) {
+ if (server.maxidletime < 0) {
err = "Invalid timeout value"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"port") && argc == 2) {
server.master = NULL;
server.replstate = REDIS_REPL_CONNECT;
}
+ zfree(c->argv);
zfree(c);
}
static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
listNode *ln;
- robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */
int outc = 0, j;
+ robj **outv;
+ /* (args*2)+1 is enough room for args, spaces, newlines */
+ robj *static_outv[REDIS_STATIC_ARGS*2+1];
+
+ if (argc <= REDIS_STATIC_ARGS) {
+ outv = static_outv;
+ } else {
+ outv = zmalloc(sizeof(robj*)*(argc*2+1));
+ if (!outv) oom("replicationFeedSlaves");
+ }
for (j = 0; j < argc; j++) {
if (j != 0) outv[outc++] = shared.space;
for (j = 0; j < outc; j++) addReply(slave,outv[j]);
}
for (j = 0; j < outc; j++) decrRefCount(outv[j]);
+ if (outv != static_outv) zfree(outv);
}
static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
- sdsfree(query);
if (argv == NULL) oom("sdssplitlen");
- for (j = 0; j < argc && j < REDIS_MAX_ARGS; j++) {
+ sdsfree(query);
+
+ if (c->argv) zfree(c->argv);
+ c->argv = zmalloc(sizeof(robj*)*argc);
+ if (c->argv == NULL) oom("allocating arguments list for client");
+
+ for (j = 0; j < argc; j++) {
if (sdslen(argv[j])) {
c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
c->argc++;
c->fd = fd;
c->querybuf = sdsempty();
c->argc = 0;
+ c->argv = NULL;
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
if (fread(c,clen,1,fp) == 0) goto err;
if (lzf_decompress(c,clen,val,len) == 0) goto err;
+ zfree(c);
return createObject(REDIS_STRING,val);
err:
zfree(c);
/* ========================= Type agnostic commands ========================= */
static void delCommand(redisClient *c) {
- if (deleteKey(c->db,c->argv[1])) {
- server.dirty++;
- addReply(c,shared.cone);
- } else {
+ int deleted = 0, j;
+
+ for (j = 1; j < c->argc; j++) {
+ if (deleteKey(c->db,c->argv[j])) {
+ server.dirty++;
+ deleted++;
+ }
+ }
+ switch(deleted) {
+ case 0:
addReply(c,shared.czero);
+ break;
+ case 1:
+ addReply(c,shared.cone);
+ break;
+ default:
+ addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
+ break;
}
}
/* If we have a target key where to store the resulting set
* create this key with an empty set inside */
dstset = createSetObject();
- deleteKey(c->db,dstkey);
- dictAdd(c->db->dict,dstkey,dstset);
- incrRefCount(dstkey);
}
/* Iterate all the elements of the first (smallest) set, and test
}
dictReleaseIterator(di);
+ if (dstkey) {
+ /* Store the resulting set into the target */
+ deleteKey(c->db,dstkey);
+ dictAdd(c->db->dict,dstkey,dstset);
+ incrRefCount(dstkey);
+ }
+
if (!dstkey) {
lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
} else {
sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
}
-static void sunionGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
+#define REDIS_OP_UNION 0
+#define REDIS_OP_DIFF 1
+
+static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
dict **dv = zmalloc(sizeof(dict*)*setsnum);
dictIterator *di;
dictEntry *de;
- robj *lenobj = NULL, *dstset = NULL;
+ robj *dstset = NULL;
int j, cardinality = 0;
- if (!dv) oom("sunionCommand");
+ if (!dv) oom("sunionDiffGenericCommand");
for (j = 0; j < setsnum; j++) {
robj *setobj;
* this set object will be the resulting object to set into the target key*/
dstset = createSetObject();
- /* The first thing we should output is the total number of elements...
- * since this is a multi-bulk write, but at this stage we don't know
- * the intersection set size, so we use a trick, append an empty object
- * to the output list and save the pointer to later modify it with the
- * right length */
- if (!dstkey) {
- lenobj = createObject(REDIS_STRING,NULL);
- addReply(c,lenobj);
- decrRefCount(lenobj);
- } else {
- /* If we have a target key where to store the resulting set
- * create this key with an empty set inside */
- deleteKey(c->db,dstkey);
- dictAdd(c->db->dict,dstkey,dstset);
- incrRefCount(dstkey);
- server.dirty++;
- }
-
/* Iterate all the elements of all the sets, add every element a single
* time to the result set */
for (j = 0; j < setsnum; j++) {
+ if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
if (!dv[j]) continue; /* non existing keys are like empty sets */
di = dictGetIterator(dv[j]);
/* dictAdd will not add the same element multiple times */
ele = dictGetEntryKey(de);
- if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
- incrRefCount(ele);
- if (!dstkey) {
- addReplySds(c,sdscatprintf(sdsempty(),
- "$%d\r\n",sdslen(ele->ptr)));
- addReply(c,ele);
- addReply(c,shared.crlf);
+ if (op == REDIS_OP_UNION || j == 0) {
+ if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
+ incrRefCount(ele);
cardinality++;
}
+ } else if (op == REDIS_OP_DIFF) {
+ if (dictDelete(dstset->ptr,ele) == DICT_OK) {
+ cardinality--;
+ }
}
}
dictReleaseIterator(di);
+
+ if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
}
+ /* Output the content of the resulting set, if not in STORE mode */
+ if (!dstkey) {
+ addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
+ di = dictGetIterator(dstset->ptr);
+ if (!di) oom("dictGetIterator");
+ while((de = dictNext(di)) != NULL) {
+ robj *ele;
+
+ ele = dictGetEntryKey(de);
+ addReplySds(c,sdscatprintf(sdsempty(),
+ "$%d\r\n",sdslen(ele->ptr)));
+ addReply(c,ele);
+ addReply(c,shared.crlf);
+ }
+ dictReleaseIterator(di);
+ } else {
+ /* If we have a target key where to store the resulting set
+ * create this key with the result set inside */
+ deleteKey(c->db,dstkey);
+ dictAdd(c->db->dict,dstkey,dstset);
+ incrRefCount(dstkey);
+ server.dirty++;
+ }
+
+ /* Cleanup */
if (!dstkey) {
- lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
decrRefCount(dstset);
} else {
addReply(c,shared.ok);
}
static void sunionCommand(redisClient *c) {
- sunionGenericCommand(c,c->argv+1,c->argc-1,NULL);
+ sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
}
static void sunionstoreCommand(redisClient *c) {
- sunionGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
+ sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
+}
+
+static void sdiffCommand(redisClient *c) {
+ sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
+}
+
+static void sdiffstoreCommand(redisClient *c) {
+ sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
}
static void flushdbCommand(redisClient *c) {
server.sort_desc = desc;
server.sort_alpha = alpha;
server.sort_bypattern = sortby ? 1 : 0;
- qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
+ if (sortby && (start != 0 || end != vectorlen-1))
+ pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
+ else
+ qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
}
/* Send command output to the output buffer, performing the specified
/* =================================== Main! ================================ */
+#ifdef __linux__
+int linuxOvercommitMemoryValue(void) {
+ FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
+ char buf[64];
+
+ if (!fp) return -1;
+ if (fgets(buf,64,fp) == NULL) {
+ fclose(fp);
+ return -1;
+ }
+ fclose(fp);
+
+ return atoi(buf);
+}
+
+void linuxOvercommitMemoryWarning(void) {
+ if (linuxOvercommitMemoryValue() == 0) {
+ redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
+ }
+}
+#endif /* __linux__ */
+
static void daemonize(void) {
int fd;
FILE *fp;
}
int main(int argc, char **argv) {
+#ifdef __linux__
+ linuxOvercommitMemoryWarning();
+#endif
+
initServerConfig();
if (argc == 2) {
ResetServerSaveParams();