From: antirez Date: Tue, 2 Nov 2010 22:47:52 +0000 (+0100) Subject: Merge remote branch 'pietern/unixsocket' X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/0a546fc01758f9a9f8b2113764c2cf963df6ef20?ds=inline;hp=-c Merge remote branch 'pietern/unixsocket' --- 0a546fc01758f9a9f8b2113764c2cf963df6ef20 diff --combined redis.conf index 8d69f929,3a2b45e2..8ad5cc2e --- a/redis.conf +++ b/redis.conf @@@ -20,7 -20,7 +20,7 @@@ daemonize n # default. You can specify a custom pid file location here. pidfile /var/run/redis.pid - # Accept connections on the specified port, default is 6379 + # Accept connections on the specified port, default is 6379. port 6379 # If you want you can bind a single interface, if the bind option is not @@@ -28,6 -28,12 +28,12 @@@ # # bind 127.0.0.1 + # Specify the path for the unix socket that will be used to listen for + # incoming connections. There is no default, so Redis will not listen + # on a unix socket when not specified. + # + # unixsocket /tmp/redis.sock + # Close the connection after a client is idle for N seconds (0 to disable) timeout 300 @@@ -148,25 -154,6 +154,25 @@@ dir . # # maxmemory +# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory +# is reached? You can select among five behavior: +# +# volatile-lru -> remove the key with an expire set using an LRU algorithm +# allkeys-lru -> remove any key accordingly to the LRU algorithm +# volatile-random -> remove a random key with an expire set +# allkeys->random -> remove a random key, any key +# volatile-ttl -> remove the key with the nearest expire time (minor TTL) +# +# maxmemory-policy volatile-lru + +# LRU and minimal TTL algorithms are not precise algorithms but approximated +# algorithms (in order to save memory), so you can select as well the sample +# size to check. For instance for default Redis will check three keys and +# pick the one that was used less recently, you can change the sample size +# using the following configuration directive. +# +# maxmemory-samples 3 + ############################## APPEND ONLY MODE ############################### # By default Redis asynchronously dumps the dataset on disk. If you can live diff --combined src/aof.c index 4dbce394,eb67a7bd..2396ba2c --- a/src/aof.c +++ b/src/aof.c @@@ -266,6 -266,9 +266,6 @@@ int loadAppendOnlyFile(char *filename) redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr); exit(1); } - /* Try object encoding */ - if (cmd->flags & REDIS_CMD_BULK) - argv[argc-1] = tryObjectEncoding(argv[argc-1]); /* Run the command in the context of a fake client */ fakeClient->argc = argc; fakeClient->argv = argv; @@@ -311,6 -314,55 +311,6 @@@ fmterr exit(1); } -/* Write binary-safe string into a file in the bulkformat - * $\r\n\r\n */ -int fwriteBulkString(FILE *fp, char *s, unsigned long len) { - char cbuf[128]; - int clen; - cbuf[0] = '$'; - clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len); - cbuf[clen++] = '\r'; - cbuf[clen++] = '\n'; - if (fwrite(cbuf,clen,1,fp) == 0) return 0; - if (len > 0 && fwrite(s,len,1,fp) == 0) return 0; - if (fwrite("\r\n",2,1,fp) == 0) return 0; - return 1; -} - -/* Write a double value in bulk format $\r\n\r\n */ -int fwriteBulkDouble(FILE *fp, double d) { - char buf[128], dbuf[128]; - - snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d); - snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2); - if (fwrite(buf,strlen(buf),1,fp) == 0) return 0; - if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0; - return 1; -} - -/* Write a long value in bulk format $\r\n\r\n */ -int fwriteBulkLongLong(FILE *fp, long long l) { - char bbuf[128], lbuf[128]; - unsigned int blen, llen; - llen = ll2string(lbuf,32,l); - blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf); - if (fwrite(bbuf,blen,1,fp) == 0) return 0; - return 1; -} - -/* Delegate writing an object to writing a bulk string or bulk long long. */ -int fwriteBulkObject(FILE *fp, robj *obj) { - /* Avoid using getDecodedObject to help copy-on-write (we are often - * in a child process when this function is called). */ - if (obj->encoding == REDIS_ENCODING_INT) { - return fwriteBulkLongLong(fp,(long)obj->ptr); - } else if (obj->encoding == REDIS_ENCODING_RAW) { - return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr)); - } else { - redisPanic("Unknown string encoding"); - } -} - /* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */ int rewriteAppendOnlyFile(char *filename) { @@@ -549,7 -601,8 +549,8 @@@ int rewriteAppendOnlyFileBackground(voi char tmpfile[256]; if (server.vm_enabled) vmReopenSwapFile(); - close(server.fd); + if (server.ipfd > 0) close(server.ipfd); + if (server.sofd > 0) close(server.sofd); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { _exit(0); diff --combined src/config.c index db58a236,4257fc36..bbe9d402 --- a/src/config.c +++ b/src/config.c @@@ -71,6 -71,8 +71,8 @@@ void loadServerConfig(char *filename) } } else if (!strcasecmp(argv[0],"bind") && argc == 2) { server.bindaddr = zstrdup(argv[1]); + } else if (!strcasecmp(argv[0],"unixsocket") && argc == 2) { + server.unixsocket = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"save") && argc == 3) { int seconds = atoi(argv[1]); int changes = atoi(argv[2]); @@@ -123,27 -125,6 +125,27 @@@ server.maxclients = atoi(argv[1]); } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) { server.maxmemory = memtoll(argv[1],NULL); + } else if (!strcasecmp(argv[0],"maxmemory-policy") && argc == 2) { + if (!strcasecmp(argv[1],"volatile-lru")) { + server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU; + } else if (!strcasecmp(argv[1],"volatile-random")) { + server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_RANDOM; + } else if (!strcasecmp(argv[1],"volatile-ttl")) { + server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_TTL; + } else if (!strcasecmp(argv[1],"allkeys-lru")) { + server.maxmemory_policy = REDIS_MAXMEMORY_ALLKEYS_LRU; + } else if (!strcasecmp(argv[1],"allkeys-random")) { + server.maxmemory_policy = REDIS_MAXMEMORY_ALLKEYS_RANDOM; + } else { + err = "Invalid maxmemory policy"; + goto loaderr; + } + } else if (!strcasecmp(argv[0],"maxmemory-samples") && argc == 2) { + server.maxmemory_samples = atoi(argv[1]); + if (server.maxmemory_samples <= 0) { + err = "maxmemory-samples must be 1 or greater"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) { server.masterhost = sdsnew(argv[1]); server.masterport = atoi(argv[2]); @@@ -246,11 -227,8 +248,11 @@@ loaderr *----------------------------------------------------------------------------*/ void configSetCommand(redisClient *c) { - robj *o = getDecodedObject(c->argv[3]); + robj *o; long long ll; + redisAssert(c->argv[2]->encoding == REDIS_ENCODING_RAW); + redisAssert(c->argv[3]->encoding == REDIS_ENCODING_RAW); + o = c->argv[3]; if (!strcasecmp(c->argv[2]->ptr,"dbfilename")) { zfree(server.dbfilename); @@@ -266,24 -244,6 +268,24 @@@ ll < 0) goto badfmt; server.maxmemory = ll; if (server.maxmemory) freeMemoryIfNeeded(); + } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory-policy")) { + if (!strcasecmp(o->ptr,"volatile-lru")) { + server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU; + } else if (!strcasecmp(o->ptr,"volatile-random")) { + server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_RANDOM; + } else if (!strcasecmp(o->ptr,"volatile-ttl")) { + server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_TTL; + } else if (!strcasecmp(o->ptr,"allkeys-lru")) { + server.maxmemory_policy = REDIS_MAXMEMORY_ALLKEYS_LRU; + } else if (!strcasecmp(o->ptr,"allkeys-random")) { + server.maxmemory_policy = REDIS_MAXMEMORY_ALLKEYS_RANDOM; + } else { + goto badfmt; + } + } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory-samples")) { + if (getLongLongFromObject(o,&ll) == REDIS_ERR || + ll <= 0) goto badfmt; + server.maxmemory_samples = ll; } else if (!strcasecmp(c->argv[2]->ptr,"timeout")) { if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0 || ll > LONG_MAX) goto badfmt; @@@ -315,6 -275,7 +317,6 @@@ if (startAppendOnly() == REDIS_ERR) { addReplyError(c, "Unable to turn on AOF. Check server logs."); - decrRefCount(o); return; } } @@@ -356,8 -317,10 +358,8 @@@ } else { addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s", (char*)c->argv[2]->ptr); - decrRefCount(o); return; } - decrRefCount(o); addReply(c,shared.ok); return; @@@ -365,15 -328,14 +367,15 @@@ badfmt: /* Bad format errors * addReplyErrorFormat(c,"Invalid argument '%s' for CONFIG SET '%s'", (char*)o->ptr, (char*)c->argv[2]->ptr); - decrRefCount(o); } void configGetCommand(redisClient *c) { - robj *o = getDecodedObject(c->argv[2]); + robj *o = c->argv[2]; void *replylen = addDeferredMultiBulkLength(c); char *pattern = o->ptr; + char buf[128]; int matches = 0; + redisAssert(o->encoding == REDIS_ENCODING_RAW); if (stringmatch(pattern,"dbfilename",0)) { addReplyBulkCString(c,"dbfilename"); @@@ -391,34 -353,17 +393,34 @@@ matches++; } if (stringmatch(pattern,"maxmemory",0)) { - char buf[128]; - - ll2string(buf,128,server.maxmemory); + ll2string(buf,sizeof(buf),server.maxmemory); addReplyBulkCString(c,"maxmemory"); addReplyBulkCString(c,buf); matches++; } - if (stringmatch(pattern,"timeout",0)) { - char buf[128]; + if (stringmatch(pattern,"maxmemory-policy",0)) { + char *s; - ll2string(buf,128,server.maxidletime); + switch(server.maxmemory_policy) { + case REDIS_MAXMEMORY_VOLATILE_LRU: s = "volatile-lru"; break; + case REDIS_MAXMEMORY_VOLATILE_TTL: s = "volatile-ttl"; break; + case REDIS_MAXMEMORY_VOLATILE_RANDOM: s = "volatile-random"; break; + case REDIS_MAXMEMORY_ALLKEYS_LRU: s = "allkeys-lru"; break; + case REDIS_MAXMEMORY_ALLKEYS_RANDOM: s = "allkeys-random"; break; + default: s = "unknown"; break; /* too harmless to panic */ + } + addReplyBulkCString(c,"maxmemory-policy"); + addReplyBulkCString(c,s); + matches++; + } + if (stringmatch(pattern,"maxmemory-samples",0)) { + ll2string(buf,sizeof(buf),server.maxmemory_samples); + addReplyBulkCString(c,"maxmemory-samples"); + addReplyBulkCString(c,buf); + matches++; + } + if (stringmatch(pattern,"timeout",0)) { + ll2string(buf,sizeof(buf),server.maxidletime); addReplyBulkCString(c,"timeout"); addReplyBulkCString(c,buf); matches++; @@@ -462,6 -407,7 +464,6 @@@ sdsfree(buf); matches++; } - decrRefCount(o); setDeferredMultiBulkLength(c,replylen,matches*2); } @@@ -474,11 -420,10 +476,11 @@@ void configCommand(redisClient *c) configGetCommand(c); } else if (!strcasecmp(c->argv[1]->ptr,"resetstat")) { if (c->argc != 2) goto badarity; + server.stat_keyspace_hits = 0; + server.stat_keyspace_misses = 0; server.stat_numcommands = 0; server.stat_numconnections = 0; server.stat_expiredkeys = 0; - server.stat_starttime = time(NULL); addReply(c,shared.ok); } else { addReplyError(c, diff --combined src/networking.c index 6181799a,d1c6a75a..d2eb2543 --- a/src/networking.c +++ b/src/networking.c @@@ -28,11 -28,13 +28,11 @@@ redisClient *createClient(int fd) selectDb(c,0); c->fd = fd; c->querybuf = sdsempty(); - c->newline = NULL; + c->reqtype = 0; c->argc = 0; c->argv = NULL; + c->multibulklen = 0; c->bulklen = -1; - c->multibulk = 0; - c->mbargc = 0; - c->mbargv = NULL; c->sentlen = 0; c->flags = 0; c->lastinteraction = time(NULL); @@@ -55,12 -57,7 +55,12 @@@ return c; } +/* Set the event loop to listen for write events on the client's socket. + * Typically gets called every time a reply is built. */ int _installWriteEvent(redisClient *c) { + /* When CLOSE_AFTER_REPLY is set, no more replies may be added! */ + redisAssert(!(c->flags & REDIS_CLOSE_AFTER_REPLY)); + if (c->fd <= 0) return REDIS_ERR; if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || @@@ -339,23 -336,11 +339,11 @@@ void addReplyBulkCString(redisClient *c } } - void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - int cport, cfd; - char cip[128]; + static void acceptCommonHandler(int fd) { redisClient *c; - REDIS_NOTUSED(el); - REDIS_NOTUSED(mask); - REDIS_NOTUSED(privdata); - - cfd = anetAccept(server.neterr, fd, cip, &cport); - if (cfd == AE_ERR) { - redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); - return; - } - redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); - if ((c = createClient(cfd)) == NULL) { + if ((c = createClient(fd)) == NULL) { redisLog(REDIS_WARNING,"Error allocating resoures for the client"); - close(cfd); /* May be already closed, just ingore errors */ + close(fd); /* May be already closed, just ingore errors */ return; } /* If maxclient directive is set and this is one client more... close the @@@ -375,11 -360,47 +363,43 @@@ server.stat_numconnections++; } + void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cport, cfd; + char cip[128]; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + REDIS_NOTUSED(privdata); + + cfd = anetTcpAccept(server.neterr, fd, cip, &cport); + if (cfd == AE_ERR) { + redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); + return; + } + redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); + acceptCommonHandler(cfd); + } + + void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cfd; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + REDIS_NOTUSED(privdata); + + cfd = anetUnixAccept(server.neterr, fd); + if (cfd == AE_ERR) { + redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); + return; + } + redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket); + acceptCommonHandler(cfd); + } + + static void freeClientArgv(redisClient *c) { int j; - for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); - for (j = 0; j < c->mbargc; j++) - decrRefCount(c->mbargv[j]); c->argc = 0; - c->mbargc = 0; } void freeClient(redisClient *c) { @@@ -460,6 -481,7 +480,6 @@@ } /* Release memory */ zfree(c->argv); - zfree(c->mbargv); freeClientMultiState(c); zfree(c); } @@@ -544,9 -566,6 +564,9 @@@ void sendReplyToClient(aeEventLoop *el if (listLength(c->reply) == 0) { c->sentlen = 0; aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); + + /* Close connection after entire reply has been sent. */ + if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c); } } @@@ -631,9 -650,9 +651,9 @@@ void sendReplyToClientWritev(aeEventLoo /* resetClient prepare the client to process the next command */ void resetClient(redisClient *c) { freeClientArgv(c); + c->reqtype = 0; + c->multibulklen = 0; c->bulklen = -1; - c->multibulk = 0; - c->newline = NULL; } void closeTimedoutClients(void) { @@@ -664,172 -683,90 +684,172 @@@ } } -void processInputBuffer(redisClient *c) { - int seeknewline = 0; - -again: - /* Before to process the input buffer, make sure the client is not - * waitig for a blocking operation such as BLPOP. Note that the first - * iteration the client is never blocked, otherwise the processInputBuffer - * would not be called at all, but after the execution of the first commands - * in the input buffer the client may be blocked, and the "goto again" - * will try to reiterate. The following line will make it return asap. */ - if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; - - if (seeknewline && c->bulklen == -1) c->newline = strchr(c->querybuf,'\n'); - seeknewline = 1; - if (c->bulklen == -1) { - /* Read the first line of the query */ - size_t querylen; - - if (c->newline) { - char *p = c->newline; - sds query, *argv; - int argc, j; - - c->newline = NULL; - query = c->querybuf; - c->querybuf = sdsempty(); - querylen = 1+(p-(query)); - if (sdslen(query) > querylen) { - /* leave data after the first line of the query in the buffer */ - c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen); - } - *p = '\0'; /* remove "\n" */ - if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */ - sdsupdatelen(query); - - /* Now we can split the query in arguments */ - argv = sdssplitlen(query,sdslen(query)," ",1,&argc); - sdsfree(query); - - if (c->argv) zfree(c->argv); - c->argv = zmalloc(sizeof(robj*)*argc); - - for (j = 0; j < argc; j++) { - if (sdslen(argv[j])) { - c->argv[c->argc] = createObject(REDIS_STRING,argv[j]); - c->argc++; - } else { - sdsfree(argv[j]); +int processInlineBuffer(redisClient *c) { + char *newline = strstr(c->querybuf,"\r\n"); + int argc, j; + sds *argv; + size_t querylen; + + /* Nothing to do without a \r\n */ + if (newline == NULL) + return REDIS_ERR; + + /* Split the input buffer up to the \r\n */ + querylen = newline-(c->querybuf); + argv = sdssplitlen(c->querybuf,querylen," ",1,&argc); + + /* Leave data after the first line of the query in the buffer */ + c->querybuf = sdsrange(c->querybuf,querylen+2,-1); + + /* Setup argv array on client structure */ + if (c->argv) zfree(c->argv); + c->argv = zmalloc(sizeof(robj*)*argc); + + /* Create redis objects for all arguments. */ + for (c->argc = 0, j = 0; j < argc; j++) { + if (sdslen(argv[j])) { + c->argv[c->argc] = createObject(REDIS_STRING,argv[j]); + c->argc++; + } else { + sdsfree(argv[j]); + } + } + zfree(argv); + return REDIS_OK; +} + +/* Helper function. Trims query buffer to make the function that processes + * multi bulk requests idempotent. */ +static void setProtocolError(redisClient *c, int pos) { + c->flags |= REDIS_CLOSE_AFTER_REPLY; + c->querybuf = sdsrange(c->querybuf,pos,-1); +} + +int processMultibulkBuffer(redisClient *c) { + char *newline = NULL; + char *eptr; + int pos = 0, tolerr; + long bulklen; + + if (c->multibulklen == 0) { + /* The client should have been reset */ + redisAssert(c->argc == 0); + + /* Multi bulk length cannot be read without a \r\n */ + newline = strstr(c->querybuf,"\r\n"); + if (newline == NULL) + return REDIS_ERR; + + /* We know for sure there is a whole line since newline != NULL, + * so go ahead and find out the multi bulk length. */ + redisAssert(c->querybuf[0] == '*'); + c->multibulklen = strtol(c->querybuf+1,&eptr,10); + pos = (newline-c->querybuf)+2; + if (c->multibulklen <= 0) { + c->querybuf = sdsrange(c->querybuf,pos,-1); + return REDIS_OK; + } else if (c->multibulklen > 1024*1024) { + addReplyError(c,"Protocol error: invalid multibulk length"); + setProtocolError(c,pos); + return REDIS_ERR; + } + + /* Setup argv array on client structure */ + if (c->argv) zfree(c->argv); + c->argv = zmalloc(sizeof(robj*)*c->multibulklen); + + /* Search new newline */ + newline = strstr(c->querybuf+pos,"\r\n"); + } + + redisAssert(c->multibulklen > 0); + while(c->multibulklen) { + /* Read bulk length if unknown */ + if (c->bulklen == -1) { + newline = strstr(c->querybuf+pos,"\r\n"); + if (newline != NULL) { + if (c->querybuf[pos] != '$') { + addReplyErrorFormat(c, + "Protocol error: expected '$', got '%c'", + c->querybuf[pos]); + setProtocolError(c,pos); + return REDIS_ERR; } + + bulklen = strtol(c->querybuf+pos+1,&eptr,10); + tolerr = (eptr[0] != '\r'); + if (tolerr || bulklen == LONG_MIN || bulklen == LONG_MAX || + bulklen < 0 || bulklen > 1024*1024*1024) + { + addReplyError(c,"Protocol error: invalid bulk length"); + setProtocolError(c,pos); + return REDIS_ERR; + } + pos += eptr-(c->querybuf+pos)+2; + c->bulklen = bulklen; + } else { + /* No newline in current buffer, so wait for more data */ + break; } - zfree(argv); - if (c->argc) { - /* Execute the command. If the client is still valid - * after processCommand() return and there is something - * on the query buffer try to process the next command. */ - if (processCommand(c) && sdslen(c->querybuf)) goto again; + } + + /* Read bulk argument */ + if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) { + /* Not enough data (+2 == trailing \r\n) */ + break; + } else { + c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen); + pos += c->bulklen+2; + c->bulklen = -1; + c->multibulklen--; + } + } + + /* Trim to pos */ + c->querybuf = sdsrange(c->querybuf,pos,-1); + + /* We're done when c->multibulk == 0 */ + if (c->multibulklen == 0) { + return REDIS_OK; + } + return REDIS_ERR; +} + +void processInputBuffer(redisClient *c) { + /* Keep processing while there is something in the input buffer */ + while(sdslen(c->querybuf)) { + /* Immediately abort if the client is in the middle of something. */ + if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; + + /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is + * written to the client. Make sure to not let the reply grow after + * this flag has been set (i.e. don't process more commands). */ + if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; + + /* Determine request type when unknown. */ + if (!c->reqtype) { + if (c->querybuf[0] == '*') { + c->reqtype = REDIS_REQ_MULTIBULK; } else { - /* Nothing to process, argc == 0. Just process the query - * buffer if it's not empty or return to the caller */ - if (sdslen(c->querybuf)) goto again; + c->reqtype = REDIS_REQ_INLINE; } - return; - } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) { - redisLog(REDIS_VERBOSE, "Client protocol error"); - freeClient(c); - return; } - } else { - /* Bulk read handling. Note that if we are at this point - the client already sent a command terminated with a newline, - we are reading the bulk data that is actually the last - argument of the command. */ - int qbl = sdslen(c->querybuf); - - if (c->bulklen <= qbl) { - /* Copy everything but the final CRLF as final argument */ - c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2); - c->argc++; - c->querybuf = sdsrange(c->querybuf,c->bulklen,-1); - /* Process the command. If the client is still valid after - * the processing and there is more data in the buffer - * try to parse it. */ - if (processCommand(c) && sdslen(c->querybuf)) goto again; - return; + + if (c->reqtype == REDIS_REQ_INLINE) { + if (processInlineBuffer(c) != REDIS_OK) break; + } else if (c->reqtype == REDIS_REQ_MULTIBULK) { + if (processMultibulkBuffer(c) != REDIS_OK) break; + } else { + redisPanic("Unknown request type"); + } + + /* Multibulk processing could see a <= 0 length. */ + if (c->argc == 0) { + resetClient(c); + } else { + /* Only reset the client when the command was executed. */ + if (processCommand(c) == REDIS_OK) + resetClient(c); } } } @@@ -856,8 -793,14 +876,8 @@@ void readQueryFromClient(aeEventLoop *e return; } if (nread) { - size_t oldlen = sdslen(c->querybuf); - c->querybuf = sdscatlen(c->querybuf, buf, nread); + c->querybuf = sdscatlen(c->querybuf,buf,nread); c->lastinteraction = time(NULL); - /* Scan this new piece of the query for the newline. We do this - * here in order to make sure we perform this scan just one time - * per piece of buffer, leading to an O(N) scan instead of O(N*N) */ - if (c->bulklen == -1 && c->newline == NULL) - c->newline = strchr(c->querybuf+oldlen,'\n'); } else { return; } diff --combined src/redis-benchmark.c index c5ababf2,68c46ad8..dcc13286 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@@ -71,6 -71,7 +71,7 @@@ static struct config aeEventLoop *el; char *hostip; int hostport; + char *hostsocket; int keepalive; long long start; long long totlatency; @@@ -359,7 -360,11 +360,11 @@@ static client createClient(void) client c = zmalloc(sizeof(struct _client)); char err[ANET_ERR_LEN]; - c->fd = anetTcpNonBlockConnect(err,config.hostip,config.hostport); + if (config.hostsocket == NULL) + c->fd = anetTcpNonBlockConnect(err,config.hostip,config.hostport); + else + c->fd = anetUnixNonBlockConnect(err,config.hostsocket); + if (c->fd == ANET_ERR) { zfree(c); fprintf(stderr,"Connect: %s\n",err); @@@ -455,6 -460,9 +460,9 @@@ void parseOptions(int argc, char **argv } else if (!strcmp(argv[i],"-p") && !lastarg) { config.hostport = atoi(argv[i+1]); i++; + } else if (!strcmp(argv[i],"-s") && !lastarg) { + config.hostsocket = argv[i+1]; + i++; } else if (!strcmp(argv[i],"-d") && !lastarg) { config.datasize = atoi(argv[i+1]); i++; @@@ -478,7 -486,8 +486,8 @@@ printf("Wrong option '%s' or option argument missing\n\n",argv[i]); printf("Usage: redis-benchmark [-h ] [-p ] [-c ] [-n [-k ]\n\n"); printf(" -h Server hostname (default 127.0.0.1)\n"); - printf(" -p Server port (default 6379)\n"); + printf(" -p Server port (default 6379)\n"); + printf(" -s Server socket (overrides host and port)\n"); printf(" -c Number of parallel connections (default 50)\n"); printf(" -n Total number of requests (default 10000)\n"); printf(" -d Data size of SET/GET value in bytes (default 2)\n"); @@@ -537,6 -546,7 +546,7 @@@ int main(int argc, char **argv) config.hostip = "127.0.0.1"; config.hostport = 6379; + config.hostsocket = NULL; parseOptions(argc,argv); @@@ -575,28 -585,10 +585,28 @@@ aeMain(config.el); endBenchmark(); + prepareForBenchmark("MSET (10 keys, multi bulk)"); + c = createClient(); + if (!c) exit(1); + c->obuf = sdscatprintf(c->obuf,"*%d\r\n$4\r\nMSET\r\n", 11); + { + int i; + char *data = zmalloc(config.datasize+2); + memset(data,'x',config.datasize); + for (i = 0; i < 10; i++) { + c->obuf = sdscatprintf(c->obuf,"$%d\r\n%s\r\n",config.datasize,data); + } + zfree(data); + } + prepareClientForReply(c,REPLY_RETCODE); + createMissingClients(c); + aeMain(config.el); + endBenchmark(); + prepareForBenchmark("SET"); c = createClient(); if (!c) exit(1); - c->obuf = sdscatprintf(c->obuf,"SET foo_rand000000000000 %d\r\n",config.datasize); + c->obuf = sdscat(c->obuf,"SET foo_rand000000000000 "); { char *data = zmalloc(config.datasize+2); memset(data,'x',config.datasize); @@@ -630,7 -622,7 +640,7 @@@ prepareForBenchmark("LPUSH"); c = createClient(); if (!c) exit(1); - c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n"); + c->obuf = sdscat(c->obuf,"LPUSH mylist bar\r\n"); prepareClientForReply(c,REPLY_INT); createMissingClients(c); aeMain(config.el); @@@ -648,7 -640,7 +658,7 @@@ prepareForBenchmark("SADD"); c = createClient(); if (!c) exit(1); - c->obuf = sdscat(c->obuf,"SADD myset 24\r\ncounter_rand000000000000\r\n"); + c->obuf = sdscat(c->obuf,"SADD myset counter_rand000000000000\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); @@@ -666,7 -658,7 +676,7 @@@ prepareForBenchmark("LPUSH (again, in order to bench LRANGE)"); c = createClient(); if (!c) exit(1); - c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n"); + c->obuf = sdscat(c->obuf,"LPUSH mylist bar\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); diff --combined src/redis-cli.c index bc405c91,8866678b..2aad25b3 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@@ -38,7 -38,6 +38,7 @@@ #include #include #include +#include #include "anet.h" #include "sds.h" @@@ -46,11 -45,16 +46,12 @@@ #include "zmalloc.h" #include "linenoise.h" -#define REDIS_CMD_INLINE 1 -#define REDIS_CMD_BULK 2 -#define REDIS_CMD_MULTIBULK 4 - #define REDIS_NOTUSED(V) ((void) V) static struct config { char *hostip; int hostport; + char *hostsocket; long repeat; int dbnum; int interactive; @@@ -68,49 -72,6 +69,49 @@@ static int cliReadReply(int fd); static void usage(); +/*------------------------------------------------------------------------------ + * Utility functions + *--------------------------------------------------------------------------- */ + +static long long mstime(void) { + struct timeval tv; + long long mst; + + gettimeofday(&tv, NULL); + mst = ((long)tv.tv_sec)*1000; + mst += tv.tv_usec/1000; + return mst; +} + +static void printStringRepr(char *s, int len) { + printf("\""); + while(len--) { + switch(*s) { + case '\\': + case '"': + printf("\\%c",*s); + break; + case '\n': printf("\\n"); break; + case '\r': printf("\\r"); break; + case '\t': printf("\\t"); break; + case '\a': printf("\\a"); break; + case '\b': printf("\\b"); break; + default: + if (isprint(*s)) + printf("%c",*s); + else + printf("\\x%02x",(unsigned char)*s); + break; + } + s++; + } + printf("\""); +} + +/*------------------------------------------------------------------------------ + * Networking / parsing + *--------------------------------------------------------------------------- */ + /* Connect to the client. If force is not zero the connection is performed * even if there is already a connected socket. */ static int cliConnect(int force) { @@@ -119,9 -80,17 +120,17 @@@ if (fd == ANET_ERR || force) { if (force) close(fd); - fd = anetTcpConnect(err,config.hostip,config.hostport); + if (config.hostsocket == NULL) { + fd = anetTcpConnect(err,config.hostip,config.hostport); + } else { + fd = anetUnixConnect(err,config.hostsocket); + } if (fd == ANET_ERR) { - fprintf(stderr, "Could not connect to Redis at %s:%d: %s", config.hostip, config.hostport, err); + fprintf(stderr,"Could not connect to Redis at "); + if (config.hostsocket == NULL) + fprintf(stderr,"%s:%d: %s",config.hostip,config.hostport,err); + else + fprintf(stderr,"%s: %s",config.hostsocket,err); return -1; } anetTcpNoDelay(NULL,fd); @@@ -159,6 -128,31 +168,6 @@@ static int cliReadSingleLineReply(int f return 0; } -static void printStringRepr(char *s, int len) { - printf("\""); - while(len--) { - switch(*s) { - case '\\': - case '"': - printf("\\%c",*s); - break; - case '\n': printf("\\n"); break; - case '\r': printf("\\r"); break; - case '\t': printf("\\t"); break; - case '\a': printf("\\a"); break; - case '\b': printf("\\b"); break; - default: - if (isprint(*s)) - printf("%c",*s); - else - printf("\\x%02x",(unsigned char)*s); - break; - } - s++; - } - printf("\""); -} - static int cliReadBulkReply(int fd) { sds replylen = cliReadLine(fd); char *reply, crlf[2]; @@@ -337,10 -331,6 +346,10 @@@ static int cliSendCommand(int argc, cha return 0; } +/*------------------------------------------------------------------------------ + * User interface + *--------------------------------------------------------------------------- */ + static int parseOptions(int argc, char **argv) { int i; @@@ -362,6 -352,9 +371,9 @@@ } else if (!strcmp(argv[i],"-p") && !lastarg) { config.hostport = atoi(argv[i+1]); i++; + } else if (!strcmp(argv[i],"-s") && !lastarg) { + config.hostsocket = argv[i+1]; + i++; } else if (!strcmp(argv[i],"-r") && !lastarg) { config.repeat = strtoll(argv[i+1],NULL,10); i++; @@@ -410,7 -403,7 +422,7 @@@ static sds readArgFromStdin(void) } static void usage() { - fprintf(stderr, "usage: redis-cli [-iv] [-h host] [-p port] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 arg3 ... argN\n"); + fprintf(stderr, "usage: redis-cli [-iv] [-h host] [-p port] [-s /path/to/socket] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 arg3 ... argN\n"); fprintf(stderr, "usage: echo \"argN\" | redis-cli -x [options] cmd arg1 arg2 ... arg(N-1)\n\n"); fprintf(stderr, "example: cat /etc/passwd | redis-cli -x set my_passwd\n"); fprintf(stderr, "example: redis-cli get my_passwd\n"); @@@ -452,7 -445,6 +464,7 @@@ static void repl() exit(0); } else { int err; + long long start_time = mstime(), elapsed; if ((err = cliSendCommand(argc, argv, 1)) != 0) { if (err == ECONNRESET) { @@@ -463,9 -455,6 +475,9 @@@ cliSendCommand(argc,argv,1); } } + elapsed = mstime()-start_time; + if (elapsed > 500) printf("%.2f seconds\n", + (double)elapsed/1000); } } /* Free the argument vector */ @@@ -497,6 -486,7 +509,7 @@@ int main(int argc, char **argv) config.hostip = "127.0.0.1"; config.hostport = 6379; + config.hostsocket = NULL; config.repeat = 1; config.dbnum = 0; config.interactive = 0; diff --combined src/redis.c index 2cbfc689,50cf2f6c..f65901c7 --- a/src/redis.c +++ b/src/redis.c @@@ -69,120 -69,119 +69,120 @@@ double R_Zero, R_PosInf, R_NegInf, R_Na struct redisServer server; /* server global state */ struct redisCommand *commandTable; struct redisCommand readonlyCommandTable[] = { - {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, - {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, - {"setex",setexCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, - {"append",appendCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"substr",substrCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, - {"strlen",strlenCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"del",delCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, - {"exists",existsCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"mget",mgetCommand,-2,REDIS_CMD_INLINE,NULL,1,-1,1}, - {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"rpushx",rpushxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"lpushx",lpushxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"linsert",linsertCommand,5,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"rpop",rpopCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"lpop",lpopCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"brpop",brpopCommand,-3,REDIS_CMD_INLINE,NULL,1,1,1}, - {"blpop",blpopCommand,-3,REDIS_CMD_INLINE,NULL,1,1,1}, - {"llen",llenCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"lindex",lindexCommand,3,REDIS_CMD_INLINE,NULL,1,1,1}, - {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"lrange",lrangeCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, - {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, - {"lrem",lremCommand,4,REDIS_CMD_BULK,NULL,1,1,1}, - {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,2,1}, - {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"srem",sremCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, - {"smove",smoveCommand,4,REDIS_CMD_BULK,NULL,1,2,1}, - {"sismember",sismemberCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, - {"scard",scardCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"spop",spopCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,-1,1}, - {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,2,-1,1}, - {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,-1,1}, - {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,2,-1,1}, - {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,-1,1}, - {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,2,-1,1}, - {"smembers",sinterCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"zrem",zremCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, - {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, - {"zremrangebyrank",zremrangebyrankCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, - {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, - {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, - {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, - {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, - {"zcount",zcountCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, - {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, - {"zcard",zcardCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"zrank",zrankCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, - {"zrevrank",zrevrankCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, - {"hset",hsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"hsetnx",hsetnxCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"hget",hgetCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, - {"hmset",hmsetCommand,-4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"hmget",hmgetCommand,-3,REDIS_CMD_BULK,NULL,1,1,1}, - {"hincrby",hincrbyCommand,4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"hdel",hdelCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, - {"hlen",hlenCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"hkeys",hkeysCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"hvals",hvalsCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"hgetall",hgetallCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"hexists",hexistsCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, - {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,-1,2}, - {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,-1,2}, - {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"select",selectCommand,2,REDIS_CMD_INLINE,NULL,0,0,0}, - {"move",moveCommand,3,REDIS_CMD_INLINE,NULL,1,1,1}, - {"rename",renameCommand,3,REDIS_CMD_INLINE,NULL,1,1,1}, - {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE,NULL,1,1,1}, - {"expire",expireCommand,3,REDIS_CMD_INLINE,NULL,0,0,0}, - {"expireat",expireatCommand,3,REDIS_CMD_INLINE,NULL,0,0,0}, - {"keys",keysCommand,2,REDIS_CMD_INLINE,NULL,0,0,0}, - {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"auth",authCommand,2,REDIS_CMD_INLINE,NULL,0,0,0}, - {"ping",pingCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"echo",echoCommand,2,REDIS_CMD_BULK,NULL,0,0,0}, - {"save",saveCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"type",typeCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"multi",multiCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"exec",execCommand,1,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,execBlockClientOnSwappedKeys,0,0,0}, - {"discard",discardCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"sync",syncCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"flushall",flushallCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1}, - {"info",infoCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"monitor",monitorCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"persist",persistCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, - {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0}, - {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, - {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0}, - {"subscribe",subscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, - {"unsubscribe",unsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"psubscribe",psubscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, - {"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0}, - {"watch",watchCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, - {"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0} + {"get",getCommand,2,0,NULL,1,1,1}, + {"set",setCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0}, + {"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0}, + {"setex",setexCommand,4,REDIS_CMD_DENYOOM,NULL,0,0,0}, + {"append",appendCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"substr",substrCommand,4,0,NULL,1,1,1}, + {"strlen",strlenCommand,2,0,NULL,1,1,1}, + {"del",delCommand,-2,0,NULL,0,0,0}, + {"exists",existsCommand,2,0,NULL,1,1,1}, + {"incr",incrCommand,2,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"decr",decrCommand,2,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"mget",mgetCommand,-2,0,NULL,1,-1,1}, + {"rpush",rpushCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"lpush",lpushCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"rpushx",rpushxCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"lpushx",lpushxCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"linsert",linsertCommand,5,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"rpop",rpopCommand,2,0,NULL,1,1,1}, + {"lpop",lpopCommand,2,0,NULL,1,1,1}, + {"brpop",brpopCommand,-3,0,NULL,1,1,1}, + {"blpop",blpopCommand,-3,0,NULL,1,1,1}, + {"llen",llenCommand,2,0,NULL,1,1,1}, + {"lindex",lindexCommand,3,0,NULL,1,1,1}, + {"lset",lsetCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"lrange",lrangeCommand,4,0,NULL,1,1,1}, + {"ltrim",ltrimCommand,4,0,NULL,1,1,1}, + {"lrem",lremCommand,4,0,NULL,1,1,1}, + {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1}, + {"sadd",saddCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"srem",sremCommand,3,0,NULL,1,1,1}, + {"smove",smoveCommand,4,0,NULL,1,2,1}, + {"sismember",sismemberCommand,3,0,NULL,1,1,1}, + {"scard",scardCommand,2,0,NULL,1,1,1}, + {"spop",spopCommand,2,0,NULL,1,1,1}, + {"srandmember",srandmemberCommand,2,0,NULL,1,1,1}, + {"sinter",sinterCommand,-2,REDIS_CMD_DENYOOM,NULL,1,-1,1}, + {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_DENYOOM,NULL,2,-1,1}, + {"sunion",sunionCommand,-2,REDIS_CMD_DENYOOM,NULL,1,-1,1}, + {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_DENYOOM,NULL,2,-1,1}, + {"sdiff",sdiffCommand,-2,REDIS_CMD_DENYOOM,NULL,1,-1,1}, + {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_DENYOOM,NULL,2,-1,1}, + {"smembers",sinterCommand,2,0,NULL,1,1,1}, + {"zadd",zaddCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"zincrby",zincrbyCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"zrem",zremCommand,3,0,NULL,1,1,1}, + {"zremrangebyscore",zremrangebyscoreCommand,4,0,NULL,1,1,1}, + {"zremrangebyrank",zremrangebyrankCommand,4,0,NULL,1,1,1}, + {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, + {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, + {"zrange",zrangeCommand,-4,0,NULL,1,1,1}, + {"zrangebyscore",zrangebyscoreCommand,-4,0,NULL,1,1,1}, + {"zrevrangebyscore",zrevrangebyscoreCommand,-4,0,NULL,1,1,1}, + {"zcount",zcountCommand,4,0,NULL,1,1,1}, + {"zrevrange",zrevrangeCommand,-4,0,NULL,1,1,1}, + {"zcard",zcardCommand,2,0,NULL,1,1,1}, + {"zscore",zscoreCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"zrank",zrankCommand,3,0,NULL,1,1,1}, + {"zrevrank",zrevrankCommand,3,0,NULL,1,1,1}, + {"hset",hsetCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"hsetnx",hsetnxCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"hget",hgetCommand,3,0,NULL,1,1,1}, + {"hmset",hmsetCommand,-4,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"hmget",hmgetCommand,-3,0,NULL,1,1,1}, + {"hincrby",hincrbyCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"hdel",hdelCommand,3,0,NULL,1,1,1}, + {"hlen",hlenCommand,2,0,NULL,1,1,1}, + {"hkeys",hkeysCommand,2,0,NULL,1,1,1}, + {"hvals",hvalsCommand,2,0,NULL,1,1,1}, + {"hgetall",hgetallCommand,2,0,NULL,1,1,1}, + {"hexists",hexistsCommand,3,0,NULL,1,1,1}, + {"incrby",incrbyCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"decrby",decrbyCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"getset",getsetCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"mset",msetCommand,-3,REDIS_CMD_DENYOOM,NULL,1,-1,2}, + {"msetnx",msetnxCommand,-3,REDIS_CMD_DENYOOM,NULL,1,-1,2}, + {"randomkey",randomkeyCommand,1,0,NULL,0,0,0}, + {"select",selectCommand,2,0,NULL,0,0,0}, + {"move",moveCommand,3,0,NULL,1,1,1}, + {"rename",renameCommand,3,0,NULL,1,1,1}, + {"renamenx",renamenxCommand,3,0,NULL,1,1,1}, + {"expire",expireCommand,3,0,NULL,0,0,0}, + {"expireat",expireatCommand,3,0,NULL,0,0,0}, + {"keys",keysCommand,2,0,NULL,0,0,0}, + {"dbsize",dbsizeCommand,1,0,NULL,0,0,0}, + {"auth",authCommand,2,0,NULL,0,0,0}, + {"ping",pingCommand,1,0,NULL,0,0,0}, + {"echo",echoCommand,2,0,NULL,0,0,0}, + {"save",saveCommand,1,0,NULL,0,0,0}, + {"bgsave",bgsaveCommand,1,0,NULL,0,0,0}, + {"bgrewriteaof",bgrewriteaofCommand,1,0,NULL,0,0,0}, + {"shutdown",shutdownCommand,1,0,NULL,0,0,0}, + {"lastsave",lastsaveCommand,1,0,NULL,0,0,0}, + {"type",typeCommand,2,0,NULL,1,1,1}, + {"multi",multiCommand,1,0,NULL,0,0,0}, + {"exec",execCommand,1,REDIS_CMD_DENYOOM,execBlockClientOnSwappedKeys,0,0,0}, + {"discard",discardCommand,1,0,NULL,0,0,0}, + {"sync",syncCommand,1,0,NULL,0,0,0}, + {"flushdb",flushdbCommand,1,0,NULL,0,0,0}, + {"flushall",flushallCommand,1,0,NULL,0,0,0}, + {"sort",sortCommand,-2,REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"info",infoCommand,1,0,NULL,0,0,0}, + {"monitor",monitorCommand,1,0,NULL,0,0,0}, + {"ttl",ttlCommand,2,0,NULL,1,1,1}, + {"persist",persistCommand,2,0,NULL,1,1,1}, + {"slaveof",slaveofCommand,3,0,NULL,0,0,0}, + {"debug",debugCommand,-2,0,NULL,0,0,0}, + {"config",configCommand,-2,0,NULL,0,0,0}, + {"subscribe",subscribeCommand,-2,0,NULL,0,0,0}, + {"unsubscribe",unsubscribeCommand,-1,0,NULL,0,0,0}, + {"psubscribe",psubscribeCommand,-2,0,NULL,0,0,0}, + {"punsubscribe",punsubscribeCommand,-1,0,NULL,0,0,0}, + {"publish",publishCommand,3,REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0}, + {"watch",watchCommand,-2,0,NULL,0,0,0}, + {"unwatch",unwatchCommand,1,0,NULL,0,0,0} }; /*============================ Utility functions ============================ */ @@@ -479,10 -478,6 +479,10 @@@ void activeExpireCycle(void) } } +void updateLRUClock(void) { + server.lruclock = (time(NULL)/REDIS_LRU_CLOCK_RESOLUTION) & + REDIS_LRU_CLOCK_MAX; +} int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j, loops = server.cronloops++; @@@ -495,19 -490,19 +495,19 @@@ * in objects at every object access, and accuracy is not needed. * To access a global var is faster than calling time(NULL) */ server.unixtime = time(NULL); - /* We have just 21 bits per object for LRU information. - * So we use an (eventually wrapping) LRU clock with minutes resolution. + /* We have just 22 bits per object for LRU information. + * So we use an (eventually wrapping) LRU clock with 10 seconds resolution. + * 2^22 bits with 10 seconds resoluton is more or less 1.5 years. * - * When we need to select what object to swap, we compute the minimum - * time distance between the current lruclock and the object last access - * lruclock info. Even if clocks will wrap on overflow, there is - * the interesting property that we are sure that at least - * ABS(A-B) minutes passed between current time and timestamp B. + * Note that even if this will wrap after 1.5 years it's not a problem, + * everything will still work but just some object will appear younger + * to Redis. But for this to happen a given object should never be touched + * for 1.5 years. * - * This is not precise but we don't need at all precision, but just - * something statistically reasonable. + * Note that you can change the resolution altering the + * REDIS_LRU_CLOCK_RESOLUTION define. */ - server.lruclock = (time(NULL)/60)&((1<<21)-1); + updateLRUClock(); /* We received a SIGTERM, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ @@@ -714,13 -709,16 +714,16 @@@ void createSharedObjects(void) } void initServerConfig() { - server.dbnum = REDIS_DEFAULT_DBNUM; server.port = REDIS_SERVERPORT; + server.bindaddr = NULL; + server.unixsocket = NULL; + server.ipfd = -1; + server.sofd = -1; + server.dbnum = REDIS_DEFAULT_DBNUM; server.verbosity = REDIS_VERBOSE; server.maxidletime = REDIS_MAXIDLETIME; server.saveparams = NULL; server.logfile = NULL; /* NULL = log on standard output */ - server.bindaddr = NULL; server.glueoutputbuf = 1; server.daemonize = 0; server.appendonly = 0; @@@ -738,8 -736,6 +741,8 @@@ server.maxclients = 0; server.blpop_blocked_clients = 0; server.maxmemory = 0; + server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU; + server.maxmemory_samples = 3; server.vm_enabled = 0; server.vm_swap_file = zstrdup("/tmp/redis-%p.vm"); server.vm_page_size = 256; /* 256 bytes per page */ @@@ -754,7 -750,6 +757,7 @@@ server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES; server.shutdown_asap = 0; + updateLRUClock(); resetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@@ -795,9 -790,21 +798,21 @@@ void initServer() createSharedObjects(); server.el = aeCreateEventLoop(); server.db = zmalloc(sizeof(redisDb)*server.dbnum); - server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); - if (server.fd == -1) { - redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr); + server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr); + if (server.ipfd == ANET_ERR) { + redisLog(REDIS_WARNING, "Opening port: %s", server.neterr); + exit(1); + } + if (server.unixsocket != NULL) { + unlink(server.unixsocket); /* don't care if this fails */ + server.sofd = anetUnixServer(server.neterr,server.unixsocket); + if (server.sofd == ANET_ERR) { + redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr); + exit(1); + } + } + if (server.ipfd < 0 && server.sofd < 0) { + redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } for (j = 0; j < server.dbnum; j++) { @@@ -824,12 -831,12 +839,14 @@@ server.stat_numconnections = 0; server.stat_expiredkeys = 0; server.stat_starttime = time(NULL); + server.stat_keyspace_misses = 0; + server.stat_keyspace_hits = 0; server.unixtime = time(NULL); aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); - if (aeCreateFileEvent(server.el, server.fd, AE_READABLE, - acceptHandler, NULL) == AE_ERR) oom("creating file event"); + if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, + acceptTcpHandler,NULL) == AE_ERR) oom("creating file event"); + if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, + acceptUnixHandler,NULL) == AE_ERR) oom("creating file event"); if (server.appendonly) { server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); @@@ -899,14 -906,84 +916,14 @@@ void call(redisClient *c, struct redisC int processCommand(redisClient *c) { struct redisCommand *cmd; - /* Handle the multi bulk command type. This is an alternative protocol - * supported by Redis in order to receive commands that are composed of - * multiple binary-safe "bulk" arguments. The latency of processing is - * a bit higher but this allows things like multi-sets, so if this - * protocol is used only for MSET and similar commands this is a big win. */ - if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') { - c->multibulk = atoi(((char*)c->argv[0]->ptr)+1); - if (c->multibulk <= 0) { - resetClient(c); - return 1; - } else { - decrRefCount(c->argv[c->argc-1]); - c->argc--; - return 1; - } - } else if (c->multibulk) { - if (c->bulklen == -1) { - if (((char*)c->argv[0]->ptr)[0] != '$') { - addReplyError(c,"multi bulk protocol error"); - resetClient(c); - return 1; - } else { - char *eptr; - long bulklen = strtol(((char*)c->argv[0]->ptr)+1,&eptr,10); - int perr = eptr[0] != '\0'; - - decrRefCount(c->argv[0]); - if (perr || bulklen == LONG_MIN || bulklen == LONG_MAX || - bulklen < 0 || bulklen > 1024*1024*1024) - { - c->argc--; - addReplyError(c,"invalid bulk write count"); - resetClient(c); - return 1; - } - c->argc--; - c->bulklen = bulklen+2; /* add two bytes for CR+LF */ - return 1; - } - } else { - c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1)); - c->mbargv[c->mbargc] = c->argv[0]; - c->mbargc++; - c->argc--; - c->multibulk--; - if (c->multibulk == 0) { - robj **auxargv; - int auxargc; - - /* Here we need to swap the multi-bulk argc/argv with the - * normal argc/argv of the client structure. */ - auxargv = c->argv; - c->argv = c->mbargv; - c->mbargv = auxargv; - - auxargc = c->argc; - c->argc = c->mbargc; - c->mbargc = auxargc; - - /* We need to set bulklen to something different than -1 - * in order for the code below to process the command without - * to try to read the last argument of a bulk command as - * a special argument. */ - c->bulklen = 0; - /* continue below and process the command */ - } else { - c->bulklen = -1; - return 1; - } - } - } - /* -- end of multi bulk commands processing -- */ - - /* The QUIT command is handled as a special case. Normal command - * procs are unable to close the client connection safely */ + /* The QUIT command is handled separately. Normal command procs will + * go through checking for replication and QUIT will cause trouble + * when FORCE_REPLICATION is enabled and would be implemented in + * a regular command proc. */ if (!strcasecmp(c->argv[0]->ptr,"quit")) { - freeClient(c); - return 0; + addReply(c,shared.ok); + c->flags |= REDIS_CLOSE_AFTER_REPLY; + return REDIS_ERR; } /* Now lookup the command and check ASAP about trivial error conditions @@@ -915,18 -992,55 +932,18 @@@ if (!cmd) { addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); - resetClient(c); - return 1; + return REDIS_OK; } else if ((cmd->arity > 0 && cmd->arity != c->argc) || (c->argc < -cmd->arity)) { addReplyErrorFormat(c,"wrong number of arguments for '%s' command", cmd->name); - resetClient(c); - return 1; - } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { - /* This is a bulk command, we have to read the last argument yet. */ - char *eptr; - long bulklen = strtol(c->argv[c->argc-1]->ptr,&eptr,10); - int perr = eptr[0] != '\0'; - - decrRefCount(c->argv[c->argc-1]); - if (perr || bulklen == LONG_MAX || bulklen == LONG_MIN || - bulklen < 0 || bulklen > 1024*1024*1024) - { - c->argc--; - addReplyError(c,"invalid bulk write count"); - resetClient(c); - return 1; - } - c->argc--; - c->bulklen = bulklen+2; /* add two bytes for CR+LF */ - /* It is possible that the bulk read is already in the - * buffer. Check this condition and handle it accordingly. - * This is just a fast path, alternative to call processInputBuffer(). - * It's a good idea since the code is small and this condition - * happens most of the times. */ - if ((signed)sdslen(c->querybuf) >= c->bulklen) { - c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2); - c->argc++; - c->querybuf = sdsrange(c->querybuf,c->bulklen,-1); - } else { - /* Otherwise return... there is to read the last argument - * from the socket. */ - return 1; - } + return REDIS_OK; } - /* Let's try to encode the bulk object to save space. */ - if (cmd->flags & REDIS_CMD_BULK) - c->argv[c->argc-1] = tryObjectEncoding(c->argv[c->argc-1]); /* Check if the user is authenticated */ if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { addReplyError(c,"operation not permitted"); - resetClient(c); - return 1; + return REDIS_OK; } /* Handle the maxmemory directive. @@@ -939,7 -1053,8 +956,7 @@@ zmalloc_used_memory() > server.maxmemory) { addReplyError(c,"command not allowed when used memory > 'maxmemory'"); - resetClient(c); - return 1; + return REDIS_OK; } /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ @@@ -948,7 -1063,8 +965,7 @@@ cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand && cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context"); - resetClient(c); - return 1; + return REDIS_OK; } /* Exec the command */ @@@ -960,10 -1076,13 +977,10 @@@ addReply(c,shared.queued); } else { if (server.vm_enabled && server.vm_max_threads > 0 && - blockClientOnSwappedKeys(c,cmd)) return 1; + blockClientOnSwappedKeys(c,cmd)) return REDIS_ERR; call(c,cmd); } - - /* Prepare the client for the next command */ - resetClient(c); - return 1; + return REDIS_OK; } /*================================== Shutdown =============================== */ @@@ -982,7 -1101,7 +999,7 @@@ int prepareForShutdown() /* Append only file: fsync() the AOF and exit */ aof_fsync(server.appendfd); if (server.vm_enabled) unlink(server.vm_swap_file); - } else { + } else if (server.saveparamslen > 0) { /* Snapshotting. Perform a SYNC SAVE and exit */ if (rdbSave(server.dbfilename) != REDIS_OK) { /* Ooops.. error saving! The best we can do is to continue @@@ -993,8 -1112,6 +1010,8 @@@ redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit"); return REDIS_ERR; } + } else { + redisLog(REDIS_WARNING,"Not saving DB."); } if (server.daemonize) unlink(server.pidfile); redisLog(REDIS_WARNING,"Server exit now, bye bye..."); @@@ -1065,7 -1182,6 +1082,7 @@@ sds genRedisInfoString(void) "process_id:%ld\r\n" "uptime_in_seconds:%ld\r\n" "uptime_in_days:%ld\r\n" + "lru_clock:%ld\r\n" "used_cpu_sys:%.2f\r\n" "used_cpu_user:%.2f\r\n" "used_cpu_sys_childrens:%.2f\r\n" @@@ -1075,9 -1191,7 +1092,9 @@@ "blocked_clients:%d\r\n" "used_memory:%zu\r\n" "used_memory_human:%s\r\n" + "used_memory_rss:%zu\r\n" "mem_fragmentation_ratio:%.2f\r\n" + "use_tcmalloc:%d\r\n" "changes_since_last_save:%lld\r\n" "bgsave_in_progress:%d\r\n" "last_save_time:%ld\r\n" @@@ -1085,8 -1199,6 +1102,8 @@@ "total_connections_received:%lld\r\n" "total_commands_processed:%lld\r\n" "expired_keys:%lld\r\n" + "keyspace_hits:%lld\r\n" + "keyspace_misses:%lld\r\n" "hash_max_zipmap_entries:%zu\r\n" "hash_max_zipmap_value:%zu\r\n" "pubsub_channels:%ld\r\n" @@@ -1101,7 -1213,6 +1118,7 @@@ (long) getpid(), uptime, uptime/(3600*24), + (unsigned long) server.lruclock, (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000, (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000, (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000, @@@ -1111,13 -1222,7 +1128,13 @@@ server.blpop_blocked_clients, zmalloc_used_memory(), hmem, + zmalloc_get_rss(), zmalloc_get_fragmentation_ratio(), +#ifdef USE_TCMALLOC + 1, +#else + 0, +#endif server.dirty, server.bgsavechildpid != -1, server.lastsave, @@@ -1125,8 -1230,6 +1142,8 @@@ server.stat_numconnections, server.stat_numcommands, server.stat_expiredkeys, + server.stat_keyspace_hits, + server.stat_keyspace_misses, server.hash_max_zipmap_entries, server.hash_max_zipmap_value, dictSize(server.pubsub_channels), @@@ -1243,93 -1346,10 +1260,93 @@@ int tryFreeOneObjectFromFreelist(void) * memory usage. */ void freeMemoryIfNeeded(void) { + /* Remove keys accordingly to the active policy as long as we are + * over the memory limit. */ while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) { int j, k, freed = 0; + /* Basic strategy -- remove objects from the free list. */ if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue; + + for (j = 0; j < server.dbnum; j++) { + long bestval = 0; /* just to prevent warning */ + sds bestkey = NULL; + struct dictEntry *de; + redisDb *db = server.db+j; + dict *dict; + + if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU || + server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM) + { + dict = server.db[j].dict; + } else { + dict = server.db[j].expires; + } + if (dictSize(dict) == 0) continue; + + /* volatile-random and allkeys-random policy */ + if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM || + server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM) + { + de = dictGetRandomKey(dict); + bestkey = dictGetEntryKey(de); + } + + /* volatile-lru and allkeys-lru policy */ + else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU || + server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) + { + for (k = 0; k < server.maxmemory_samples; k++) { + sds thiskey; + long thisval; + robj *o; + + de = dictGetRandomKey(dict); + thiskey = dictGetEntryKey(de); + o = dictGetEntryVal(de); + thisval = estimateObjectIdleTime(o); + + /* Higher idle time is better candidate for deletion */ + if (bestkey == NULL || thisval > bestval) { + bestkey = thiskey; + bestval = thisval; + } + } + } + + /* volatile-ttl */ + else if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_TTL) { + for (k = 0; k < server.maxmemory_samples; k++) { + sds thiskey; + long thisval; + + de = dictGetRandomKey(dict); + thiskey = dictGetEntryKey(de); + thisval = (long) dictGetEntryVal(de); + + /* Expire sooner (minor expire unix timestamp) is better + * candidate for deletion */ + if (bestkey == NULL || thisval < bestval) { + bestkey = thiskey; + bestval = thisval; + } + } + } + + /* Finally remove the selected key. */ + if (bestkey) { + robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); + dbDelete(db,keyobj); + server.stat_expiredkeys++; + decrRefCount(keyobj); + freed++; + } + } + if (!freed) return; /* nothing to free... */ + } + + while(0) { + int j, k, freed = 0; for (j = 0; j < server.dbnum; j++) { int minttl = -1; sds minkey = NULL; @@@ -1453,7 -1473,10 +1470,10 @@@ int main(int argc, char **argv) if (rdbLoad(server.dbfilename) == REDIS_OK) redisLog(REDIS_NOTICE,"DB loaded from disk: %ld seconds",time(NULL)-start); } - redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); + if (server.ipfd > 0) + redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); + if (server.sofd > 0) + redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); aeDeleteEventLoop(server.el); @@@ -1497,7 -1520,6 +1517,7 @@@ void segvHandler(int sig, siginfo_t *in int i, trace_size = 0; ucontext_t *uc = (ucontext_t*) secret; sds infostring; + struct sigaction act; REDIS_NOTUSED(info); redisLog(REDIS_WARNING, @@@ -1519,16 -1541,7 +1539,16 @@@ /* free(messages); Don't call free() with possibly corrupted memory. */ if (server.daemonize) unlink(server.pidfile); - _exit(0); + + /* Make sure we exit with the right signal at the end. So for instance + * the core will be dumped if enabled. */ + sigemptyset (&act.sa_mask); + /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction + * is used. Otherwise, sa_handler is used */ + act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND; + act.sa_handler = SIG_DFL; + sigaction (sig, &act, NULL); + kill(getpid(),sig); } void sigtermHandler(int sig) { diff --combined src/redis.h index cf21447d,8e05a4d4..6fa3e49f --- a/src/redis.h +++ b/src/redis.h @@@ -57,15 -57,15 +57,15 @@@ /* Hash table parameters */ #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */ -/* Command flags */ -#define REDIS_CMD_BULK 1 /* Bulk write command */ -#define REDIS_CMD_INLINE 2 /* Inline command */ -/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with - this flags will return an error when the 'maxmemory' option is set in the - config file and the server is using more than maxmemory bytes of memory. - In short this commands are denied on low memory conditions. */ -#define REDIS_CMD_DENYOOM 4 -#define REDIS_CMD_FORCE_REPLICATION 8 /* Force replication even if dirty is 0 */ +/* Command flags: + * REDIS_CMD_DENYOOM: + * Commands marked with this flag will return an error when 'maxmemory' is + * set and the server is using more than 'maxmemory' bytes of memory. + * In short: commands with this flag are denied on low memory conditions. + * REDIS_CMD_FORCE_REPLICATION: + * Force replication even if dirty is 0. */ +#define REDIS_CMD_DENYOOM 4 +#define REDIS_CMD_FORCE_REPLICATION 8 /* Object types */ #define REDIS_STRING 0 @@@ -144,11 -144,6 +144,11 @@@ #define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */ #define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ #define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */ +#define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */ + +/* Client request types */ +#define REDIS_REQ_INLINE 1 +#define REDIS_REQ_MULTIBULK 2 /* Slave replication state - slave side */ #define REDIS_REPL_NONE 0 /* No active replication */ @@@ -203,13 -198,6 +203,13 @@@ #define REDIS_OP_DIFF 1 #define REDIS_OP_INTER 2 +/* Redis maxmemory strategies */ +#define REDIS_MAXMEMORY_VOLATILE_LRU 0 +#define REDIS_MAXMEMORY_VOLATILE_TTL 1 +#define REDIS_MAXMEMORY_VOLATILE_RANDOM 2 +#define REDIS_MAXMEMORY_ALLKEYS_LRU 3 +#define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4 + /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) #define redisPanic(_e) _redisPanic(#_e,__FILE__,__LINE__),_exit(1) @@@ -223,8 -211,6 +223,8 @@@ void _redisPanic(char *msg, char *file /* A redis object, that is a type able to hold a string / list / set */ /* The actual Redis Object */ +#define REDIS_LRU_CLOCK_MAX ((1<<21)-1) /* Max value of obj->lru */ +#define REDIS_LRU_CLOCK_RESOLUTION 10 /* LRU clock resolution in seconds */ typedef struct redisObject { unsigned type:4; unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */ @@@ -299,11 -285,11 +299,11 @@@ typedef struct redisClient redisDb *db; int dictid; sds querybuf; - robj **argv, **mbargv; - char *newline; /* pointing to the detected newline in querybuf */ - int argc, mbargc; - long bulklen; /* bulk read len. -1 if not in bulk read mode */ - int multibulk; /* multi bulk command format active */ + int argc; + robj **argv; + int reqtype; + int multibulklen; /* number of multi bulk arguments left to read */ + long bulklen; /* length of bulk argument in multi bulk request */ list *reply; int sentlen; time_t lastinteraction; /* time of the last interaction, used for timeout */ @@@ -352,7 -338,10 +352,10 @@@ struct sharedObjectsStruct struct redisServer { pthread_t mainthread; int port; - int fd; + char *bindaddr; + char *unixsocket; + int ipfd; + int sofd; redisDb *db; long long dirty; /* changes to DB from the last save */ long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */ @@@ -362,14 -351,12 +365,14 @@@ aeEventLoop *el; int cronloops; /* number of times the cron function run */ list *objfreelist; /* A list of freed objects to avoid malloc() */ - time_t lastsave; /* Unix time of last save succeeede */ + time_t lastsave; /* Unix time of last save succeeede */ /* Fields used only for stats */ - time_t stat_starttime; /* server start time */ - long long stat_numcommands; /* number of processed commands */ - long long stat_numconnections; /* number of connections received */ - long long stat_expiredkeys; /* number of expired keys */ + time_t stat_starttime; /* server start time */ + long long stat_numcommands; /* number of processed commands */ + long long stat_numconnections; /* number of connections received */ + long long stat_expiredkeys; /* number of expired keys */ + long long stat_keyspace_hits; /* number of successful lookups of keys */ + long long stat_keyspace_misses; /* number of failed lookups of keys */ /* Configuration */ int verbosity; int glueoutputbuf; @@@ -391,7 -378,6 +394,6 @@@ struct saveparam *saveparams; int saveparamslen; char *logfile; - char *bindaddr; char *dbfilename; char *appendfilename; char *requirepass; @@@ -406,8 -392,6 +408,8 @@@ int replstate; unsigned int maxclients; unsigned long long maxmemory; + int maxmemory_policy; + int maxmemory_samples; unsigned int blpop_blocked_clients; unsigned int vm_blocked_clients; /* Sort parameters - qsort_r() is only available under BSD so we @@@ -617,7 -601,8 +619,8 @@@ void *addDeferredMultiBulkLength(redisC void setDeferredMultiBulkLength(redisClient *c, void *node, long length); void addReplySds(redisClient *c, sds s); void processInputBuffer(redisClient *c); - void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); + void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); + void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); void addReplyBulk(redisClient *c, robj *obj); void addReplyBulkCString(redisClient *c, char *s); @@@ -696,16 -681,6 +699,16 @@@ int getLongLongFromObject(robj *o, lon char *strEncoding(int encoding); int compareStringObjects(robj *a, robj *b); int equalStringObjects(robj *a, robj *b); +unsigned long estimateObjectIdleTime(robj *o); + +/* Synchronous I/O with timeout */ +int syncWrite(int fd, char *ptr, ssize_t size, int timeout); +int syncRead(int fd, char *ptr, ssize_t size, int timeout); +int syncReadLine(int fd, char *ptr, ssize_t size, int timeout); +int fwriteBulkString(FILE *fp, char *s, unsigned long len); +int fwriteBulkDouble(FILE *fp, double d); +int fwriteBulkLongLong(FILE *fp, long long l); +int fwriteBulkObject(FILE *fp, robj *obj); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); @@@ -924,7 -899,6 +927,7 @@@ void zaddCommand(redisClient *c) void zincrbyCommand(redisClient *c); void zrangeCommand(redisClient *c); void zrangebyscoreCommand(redisClient *c); +void zrevrangebyscoreCommand(redisClient *c); void zcountCommand(redisClient *c); void zrevrangeCommand(redisClient *c); void zcardCommand(redisClient *c);