From: Pieter Noordhuis Date: Wed, 13 Oct 2010 16:55:46 +0000 (+0200) Subject: Merge master with resolved conflict in src/redis-cli.c X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/b04ce2a35ce084a043ef8749ca4fa0e62b92bd03?hp=-c Merge master with resolved conflict in src/redis-cli.c --- b04ce2a35ce084a043ef8749ca4fa0e62b92bd03 diff --combined src/aof.c index 942d4afd,16713481..eb67a7bd --- a/src/aof.c +++ b/src/aof.c @@@ -189,6 -189,7 +189,7 @@@ struct redisClient *createFakeClient(vo c->querybuf = sdsempty(); c->argc = 0; c->argv = NULL; + c->bufpos = 0; c->flags = 0; /* We set the fake client as a slave waiting for the synchronization * so that Redis will not try to send replies to this client. */ @@@ -272,12 -273,14 +273,14 @@@ int loadAppendOnlyFile(char *filename) fakeClient->argc = argc; fakeClient->argv = argv; cmd->proc(fakeClient); - /* Discard the reply objects list from the fake client */ - while(listLength(fakeClient->reply)) - listDelNode(fakeClient->reply,listFirst(fakeClient->reply)); + + /* The fake client should not have a reply */ + redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0); + /* Clean up, ready for the next command */ for (j = 0; j < argc; j++) decrRefCount(argv[j]); zfree(argv); + /* Handle swapping while loading big datasets when VM is on */ force_swapout = 0; if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) @@@ -307,7 -310,7 +310,7 @@@ readerr } exit(1); fmterr: - redisLog(REDIS_WARNING,"Bad file format reading the append only file"); + redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix "); exit(1); } @@@ -463,20 -466,30 +466,30 @@@ int rewriteAppendOnlyFile(char *filenam redisPanic("Unknown list encoding"); } } else if (o->type == REDIS_SET) { - /* Emit the SADDs needed to rebuild the set */ - dict *set = o->ptr; - dictIterator *di = dictGetIterator(set); - dictEntry *de; + char cmd[]="*3\r\n$4\r\nSADD\r\n"; - while((de = dictNext(di)) != NULL) { - char cmd[]="*3\r\n$4\r\nSADD\r\n"; - robj *eleobj = dictGetEntryKey(de); - - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + /* Emit the SADDs needed to rebuild the set */ + if (o->encoding == REDIS_ENCODING_INTSET) { + int ii = 0; + int64_t llval; + while(intsetGet(o->ptr,ii++,&llval)) { + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (fwriteBulkLongLong(fp,llval) == 0) goto werr; + } + } else if (o->encoding == REDIS_ENCODING_HT) { + dictIterator *di = dictGetIterator(o->ptr); + dictEntry *de; + while((de = dictNext(di)) != NULL) { + robj *eleobj = dictGetEntryKey(de); + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + } + dictReleaseIterator(di); + } else { + redisPanic("Unknown set encoding"); } - dictReleaseIterator(di); } else if (o->type == REDIS_ZSET) { /* Emit the ZADDs needed to rebuild the sorted set */ zset *zs = o->ptr; @@@ -588,8 -601,7 +601,8 @@@ int rewriteAppendOnlyFileBackground(voi char tmpfile[256]; if (server.vm_enabled) vmReopenSwapFile(); - close(server.fd); + if (server.ipfd > 0) close(server.ipfd); + if (server.sofd > 0) close(server.sofd); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { _exit(0); @@@ -620,12 -632,11 +633,11 @@@ void bgrewriteaofCommand(redisClient *c) { if (server.bgrewritechildpid != -1) { - addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n")); + addReplyError(c,"Background append only file rewriting already in progress"); return; } if (rewriteAppendOnlyFileBackground() == REDIS_OK) { - char *status = "+Background append only file rewriting started\r\n"; - addReplySds(c,sdsnew(status)); + addReplyStatus(c,"Background append only file rewriting started"); } else { addReply(c,shared.err); } diff --combined src/config.c index eeec9e8c,1bd678c7..4257fc36 --- a/src/config.c +++ b/src/config.c @@@ -71,8 -71,6 +71,8 @@@ void loadServerConfig(char *filename) } } else if (!strcasecmp(argv[0],"bind") && argc == 2) { server.bindaddr = zstrdup(argv[1]); + } else if (!strcasecmp(argv[0],"unixsocket") && argc == 2) { + server.unixsocket = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"save") && argc == 3) { int seconds = atoi(argv[1]); int changes = atoi(argv[2]); @@@ -201,6 -199,8 +201,8 @@@ server.list_max_ziplist_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2){ server.list_max_ziplist_value = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"set-max-intset-entries") && argc == 2){ + server.set_max_intset_entries = memtoll(argv[1], NULL); } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@@ -241,6 -241,7 +243,7 @@@ void configSetCommand(redisClient *c) if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0) goto badfmt; server.maxmemory = ll; + if (server.maxmemory) freeMemoryIfNeeded(); } else if (!strcasecmp(c->argv[2]->ptr,"timeout")) { if (getLongLongFromObject(o,&ll) == REDIS_ERR || ll < 0 || ll > LONG_MAX) goto badfmt; @@@ -270,8 -271,8 +273,8 @@@ stopAppendOnly(); } else { if (startAppendOnly() == REDIS_ERR) { - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR Unable to turn on AOF. Check server logs.\r\n")); + addReplyError(c, + "Unable to turn on AOF. Check server logs."); decrRefCount(o); return; } @@@ -312,9 -313,8 +315,8 @@@ } sdsfreesplitres(v,vlen); } else { - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR not supported CONFIG parameter %s\r\n", - (char*)c->argv[2]->ptr)); + addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s", + (char*)c->argv[2]->ptr); decrRefCount(o); return; } @@@ -323,22 -323,18 +325,18 @@@ return; badfmt: /* Bad format errors */ - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR invalid argument '%s' for CONFIG SET '%s'\r\n", + addReplyErrorFormat(c,"Invalid argument '%s' for CONFIG SET '%s'", (char*)o->ptr, - (char*)c->argv[2]->ptr)); + (char*)c->argv[2]->ptr); decrRefCount(o); } void configGetCommand(redisClient *c) { robj *o = getDecodedObject(c->argv[2]); - robj *lenobj = createObject(REDIS_STRING,NULL); + void *replylen = addDeferredMultiBulkLength(c); char *pattern = o->ptr; int matches = 0; - addReply(c,lenobj); - decrRefCount(lenobj); - if (stringmatch(pattern,"dbfilename",0)) { addReplyBulkCString(c,"dbfilename"); addReplyBulkCString(c,server.dbfilename); @@@ -410,7 -406,7 +408,7 @@@ matches++; } decrRefCount(o); - lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2); + setDeferredMultiBulkLength(c,replylen,matches*2); } void configCommand(redisClient *c) { @@@ -428,13 -424,12 +426,12 @@@ server.stat_starttime = time(NULL); addReply(c,shared.ok); } else { - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR CONFIG subcommand must be one of GET, SET, RESETSTAT\r\n")); + addReplyError(c, + "CONFIG subcommand must be one of GET, SET, RESETSTAT"); } return; badarity: - addReplySds(c,sdscatprintf(sdsempty(), - "-ERR Wrong number of arguments for CONFIG %s\r\n", - (char*) c->argv[1]->ptr)); + addReplyErrorFormat(c,"Wrong number of arguments for CONFIG %s", + (char*) c->argv[1]->ptr); } diff --combined src/networking.c index 4e93186e,632fd047..d1c6a75a --- a/src/networking.c +++ b/src/networking.c @@@ -1,5 -1,4 +1,4 @@@ #include "redis.h" - #include void *dupClientReplyValue(void *o) { @@@ -12,14 -11,24 +11,24 @@@ int listMatchObjects(void *a, void *b) } redisClient *createClient(int fd) { - redisClient *c = zmalloc(sizeof(*c)); + redisClient *c = zmalloc(sizeof(redisClient)); + c->bufpos = 0; anetNonBlock(NULL,fd); anetTcpNoDelay(NULL,fd); if (!c) return NULL; + if (aeCreateFileEvent(server.el,fd,AE_READABLE, + readQueryFromClient, c) == AE_ERR) + { + close(fd); + zfree(c); + return NULL; + } + selectDb(c,0); c->fd = fd; c->querybuf = sdsempty(); + c->newline = NULL; c->argc = 0; c->argv = NULL; c->bulklen = -1; @@@ -43,80 -52,254 +52,254 @@@ c->pubsub_patterns = listCreate(); listSetFreeMethod(c->pubsub_patterns,decrRefCount); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); - if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, - readQueryFromClient, c) == AE_ERR) { - freeClient(c); - return NULL; - } listAddNodeTail(server.clients,c); initClientMultiState(c); return c; } - void addReply(redisClient *c, robj *obj) { - if (listLength(c->reply) == 0 && + int _installWriteEvent(redisClient *c) { + if (c->fd <= 0) return REDIS_ERR; + if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, - sendReplyToClient, c) == AE_ERR) return; + sendReplyToClient, c) == AE_ERR) return REDIS_ERR; + return REDIS_OK; + } - if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) { - obj = dupStringObject(obj); - obj->refcount = 0; /* getDecodedObject() will increment the refcount */ + /* Create a duplicate of the last object in the reply list when + * it is not exclusively owned by the reply list. */ + robj *dupLastObjectIfNeeded(list *reply) { + robj *new, *cur; + listNode *ln; + redisAssert(listLength(reply) > 0); + ln = listLast(reply); + cur = listNodeValue(ln); + if (cur->refcount > 1) { + new = dupStringObject(cur); + decrRefCount(cur); + listNodeValue(ln) = new; } - listAddNodeTail(c->reply,getDecodedObject(obj)); + return listNodeValue(ln); } - void addReplySds(redisClient *c, sds s) { - robj *o = createObject(REDIS_STRING,s); - addReply(c,o); - decrRefCount(o); + int _addReplyToBuffer(redisClient *c, char *s, size_t len) { + size_t available = sizeof(c->buf)-c->bufpos; + + /* If there already are entries in the reply list, we cannot + * add anything more to the static buffer. */ + if (listLength(c->reply) > 0) return REDIS_ERR; + + /* Check that the buffer has enough space available for this string. */ + if (len > available) return REDIS_ERR; + + memcpy(c->buf+c->bufpos,s,len); + c->bufpos+=len; + return REDIS_OK; } - void addReplyDouble(redisClient *c, double d) { - char buf[128]; + void _addReplyObjectToList(redisClient *c, robj *o) { + robj *tail; + if (listLength(c->reply) == 0) { + incrRefCount(o); + listAddNodeTail(c->reply,o); + } else { + tail = listNodeValue(listLast(c->reply)); - snprintf(buf,sizeof(buf),"%.17g",d); - addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n", - (unsigned long) strlen(buf),buf)); + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); + } else { + incrRefCount(o); + listAddNodeTail(c->reply,o); + } + } } - void addReplyLongLong(redisClient *c, long long ll) { - char buf[128]; - size_t len; + /* This method takes responsibility over the sds. When it is no longer + * needed it will be free'd, otherwise it ends up in a robj. */ + void _addReplySdsToList(redisClient *c, sds s) { + robj *tail; + if (listLength(c->reply) == 0) { + listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); + } else { + tail = listNodeValue(listLast(c->reply)); - if (ll == 0) { - addReply(c,shared.czero); - return; - } else if (ll == 1) { - addReply(c,shared.cone); + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,s,sdslen(s)); + sdsfree(s); + } else { + listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); + } + } + } + + void _addReplyStringToList(redisClient *c, char *s, size_t len) { + robj *tail; + if (listLength(c->reply) == 0) { + listAddNodeTail(c->reply,createStringObject(s,len)); + } else { + tail = listNodeValue(listLast(c->reply)); + + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,s,len); + } else { + listAddNodeTail(c->reply,createStringObject(s,len)); + } + } + } + + void addReply(redisClient *c, robj *obj) { + if (_installWriteEvent(c) != REDIS_OK) return; + redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY); + + /* This is an important place where we can avoid copy-on-write + * when there is a saving child running, avoiding touching the + * refcount field of the object if it's not needed. + * + * If the encoding is RAW and there is room in the static buffer + * we'll be able to send the object to the client without + * messing with its page. */ + if (obj->encoding == REDIS_ENCODING_RAW) { + if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) + _addReplyObjectToList(c,obj); + } else { + obj = getDecodedObject(obj); + if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) + _addReplyObjectToList(c,obj); + decrRefCount(obj); + } + } + + void addReplySds(redisClient *c, sds s) { + if (_installWriteEvent(c) != REDIS_OK) { + /* The caller expects the sds to be free'd. */ + sdsfree(s); return; } - buf[0] = ':'; + if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) { + sdsfree(s); + } else { + /* This method free's the sds when it is no longer needed. */ + _addReplySdsToList(c,s); + } + } + + void addReplyString(redisClient *c, char *s, size_t len) { + if (_installWriteEvent(c) != REDIS_OK) return; + if (_addReplyToBuffer(c,s,len) != REDIS_OK) + _addReplyStringToList(c,s,len); + } + + void _addReplyError(redisClient *c, char *s, size_t len) { + addReplyString(c,"-ERR ",5); + addReplyString(c,s,len); + addReplyString(c,"\r\n",2); + } + + void addReplyError(redisClient *c, char *err) { + _addReplyError(c,err,strlen(err)); + } + + void addReplyErrorFormat(redisClient *c, const char *fmt, ...) { + va_list ap; + va_start(ap,fmt); + sds s = sdscatvprintf(sdsempty(),fmt,ap); + va_end(ap); + _addReplyError(c,s,sdslen(s)); + sdsfree(s); + } + + void _addReplyStatus(redisClient *c, char *s, size_t len) { + addReplyString(c,"+",1); + addReplyString(c,s,len); + addReplyString(c,"\r\n",2); + } + + void addReplyStatus(redisClient *c, char *status) { + _addReplyStatus(c,status,strlen(status)); + } + + void addReplyStatusFormat(redisClient *c, const char *fmt, ...) { + va_list ap; + va_start(ap,fmt); + sds s = sdscatvprintf(sdsempty(),fmt,ap); + va_end(ap); + _addReplyStatus(c,s,sdslen(s)); + sdsfree(s); + } + + /* Adds an empty object to the reply list that will contain the multi bulk + * length, which is not known when this function is called. */ + void *addDeferredMultiBulkLength(redisClient *c) { + /* Note that we install the write event here even if the object is not + * ready to be sent, since we are sure that before returning to the + * event loop setDeferredMultiBulkLength() will be called. */ + if (_installWriteEvent(c) != REDIS_OK) return NULL; + listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL)); + return listLast(c->reply); + } + + /* Populate the length object and try glueing it to the next chunk. */ + void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { + listNode *ln = (listNode*)node; + robj *len, *next; + + /* Abort when *node is NULL (see addDeferredMultiBulkLength). */ + if (node == NULL) return; + + len = listNodeValue(ln); + len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length); + if (ln->next != NULL) { + next = listNodeValue(ln->next); + + /* Only glue when the next node is non-NULL (an sds in this case) */ + if (next->ptr != NULL) { + len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr)); + listDelNode(c->reply,ln->next); + } + } + } + + void addReplyDouble(redisClient *c, double d) { + char dbuf[128], sbuf[128]; + int dlen, slen; + dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); + slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf); + addReplyString(c,sbuf,slen); + } + + void _addReplyLongLong(redisClient *c, long long ll, char prefix) { + char buf[128]; + int len; + buf[0] = prefix; len = ll2string(buf+1,sizeof(buf)-1,ll); buf[len+1] = '\r'; buf[len+2] = '\n'; - addReplySds(c,sdsnewlen(buf,len+3)); + addReplyString(c,buf,len+3); } - void addReplyUlong(redisClient *c, unsigned long ul) { - char buf[128]; - size_t len; + void addReplyLongLong(redisClient *c, long long ll) { + _addReplyLongLong(c,ll,':'); + } - if (ul == 0) { - addReply(c,shared.czero); - return; - } else if (ul == 1) { - addReply(c,shared.cone); - return; - } - len = snprintf(buf,sizeof(buf),":%lu\r\n",ul); - addReplySds(c,sdsnewlen(buf,len)); + void addReplyMultiBulkLen(redisClient *c, long length) { + _addReplyLongLong(c,length,'*'); } void addReplyBulkLen(redisClient *c, robj *obj) { - size_t len, intlen; - char buf[128]; + size_t len; if (obj->encoding == REDIS_ENCODING_RAW) { len = sdslen(obj->ptr); @@@ -133,11 -316,7 +316,7 @@@ len++; } } - buf[0] = '$'; - intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len); - buf[intlen+1] = '\r'; - buf[intlen+2] = '\n'; - addReplySds(c,sdsnewlen(buf,intlen+3)); + _addReplyLongLong(c,len,'$'); } void addReplyBulk(redisClient *c, robj *obj) { @@@ -157,11 -336,23 +336,11 @@@ void addReplyBulkCString(redisClient *c } } -void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - int cport, cfd; - char cip[128]; +static void acceptCommonHandler(int fd) { redisClient *c; - REDIS_NOTUSED(el); - REDIS_NOTUSED(mask); - REDIS_NOTUSED(privdata); - - cfd = anetAccept(server.neterr, fd, cip, &cport); - if (cfd == AE_ERR) { - redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); - return; - } - redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); - if ((c = createClient(cfd)) == NULL) { + if ((c = createClient(fd)) == NULL) { redisLog(REDIS_WARNING,"Error allocating resoures for the client"); - close(cfd); /* May be already closed, just ingore errors */ + close(fd); /* May be already closed, just ingore errors */ return; } /* If maxclient directive is set and this is one client more... close the @@@ -181,38 -372,6 +360,38 @@@ server.stat_numconnections++; } +void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cport, cfd; + char cip[128]; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + REDIS_NOTUSED(privdata); + + cfd = anetTcpAccept(server.neterr, fd, cip, &cport); + if (cfd == AE_ERR) { + redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); + return; + } + redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); + acceptCommonHandler(cfd); +} + +void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cfd; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + REDIS_NOTUSED(privdata); + + cfd = anetUnixAccept(server.neterr, fd); + if (cfd == AE_ERR) { + redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); + return; + } + redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket); + acceptCommonHandler(cfd); +} + + static void freeClientArgv(redisClient *c) { int j; @@@ -275,7 -434,8 +454,8 @@@ void freeClient(redisClient *c) server.vm_blocked_clients--; } listRelease(c->io_keys); - /* Master/slave cleanup */ + /* Master/slave cleanup. + * Case 1: we lost the connection with a slave. */ if (c->flags & REDIS_SLAVE) { if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) close(c->repldbfd); @@@ -284,9 -444,20 +464,20 @@@ redisAssert(ln != NULL); listDelNode(l,ln); } + + /* Case 2: we lost the connection with the master. */ if (c->flags & REDIS_MASTER) { server.master = NULL; server.replstate = REDIS_REPL_CONNECT; + /* Since we lost the connection with the master, we should also + * close the connection with all our slaves if we have any, so + * when we'll resync with the master the other slaves will sync again + * with us as well. Note that also when the slave is not connected + * to the master it will keep refusing connections by other slaves. */ + while (listLength(server.slaves)) { + ln = listFirst(server.slaves); + freeClient((redisClient*)ln->value); + } } /* Release memory */ zfree(c->argv); @@@ -295,34 -466,6 +486,6 @@@ zfree(c); } - #define GLUEREPLY_UP_TO (1024) - static void glueReplyBuffersIfNeeded(redisClient *c) { - int copylen = 0; - char buf[GLUEREPLY_UP_TO]; - listNode *ln; - listIter li; - robj *o; - - listRewind(c->reply,&li); - while((ln = listNext(&li))) { - int objlen; - - o = ln->value; - objlen = sdslen(o->ptr); - if (copylen + objlen <= GLUEREPLY_UP_TO) { - memcpy(buf+copylen,o->ptr,objlen); - copylen += objlen; - listDelNode(c->reply,ln); - } else { - if (copylen == 0) return; - break; - } - } - /* Now the output buffer is empty, add the new single element */ - o = createObject(REDIS_STRING,sdsnewlen(buf,copylen)); - listAddNodeHead(c->reply,o); - } - void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = privdata; int nwritten = 0, totwritten = 0, objlen; @@@ -339,31 -482,48 +502,48 @@@ return; } - while(listLength(c->reply)) { - if (server.glueoutputbuf && listLength(c->reply) > 1) - glueReplyBuffersIfNeeded(c); + while(c->bufpos > 0 || listLength(c->reply)) { + if (c->bufpos > 0) { + if (c->flags & REDIS_MASTER) { + /* Don't reply to a master */ + nwritten = c->bufpos - c->sentlen; + } else { + nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); + if (nwritten <= 0) break; + } + c->sentlen += nwritten; + totwritten += nwritten; + + /* If the buffer was sent, set bufpos to zero to continue with + * the remainder of the reply. */ + if (c->sentlen == c->bufpos) { + c->bufpos = 0; + c->sentlen = 0; + } + } else { + o = listNodeValue(listFirst(c->reply)); + objlen = sdslen(o->ptr); - o = listNodeValue(listFirst(c->reply)); - objlen = sdslen(o->ptr); + if (objlen == 0) { + listDelNode(c->reply,listFirst(c->reply)); + continue; + } - if (objlen == 0) { - listDelNode(c->reply,listFirst(c->reply)); - continue; - } + if (c->flags & REDIS_MASTER) { + /* Don't reply to a master */ + nwritten = objlen - c->sentlen; + } else { + nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen); + if (nwritten <= 0) break; + } + c->sentlen += nwritten; + totwritten += nwritten; - if (c->flags & REDIS_MASTER) { - /* Don't reply to a master */ - nwritten = objlen - c->sentlen; - } else { - nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen); - if (nwritten <= 0) break; - } - c->sentlen += nwritten; - totwritten += nwritten; - /* If we fully sent the object on head go to the next one */ - if (c->sentlen == objlen) { - listDelNode(c->reply,listFirst(c->reply)); - c->sentlen = 0; + /* If we fully sent the object on head go to the next one */ + if (c->sentlen == objlen) { + listDelNode(c->reply,listFirst(c->reply)); + c->sentlen = 0; + } } /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT * bytes, in a single threaded server it's a good idea to serve @@@ -472,6 -632,7 +652,7 @@@ void resetClient(redisClient *c) freeClientArgv(c); c->bulklen = -1; c->multibulk = 0; + c->newline = NULL; } void closeTimedoutClients(void) { @@@ -486,6 -647,7 +667,7 @@@ if (server.maxidletime && !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ !(c->flags & REDIS_MASTER) && /* no timeout for masters */ + !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */ dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */ listLength(c->pubsub_patterns) == 0 && (now - c->lastinteraction > server.maxidletime)) @@@ -502,6 -664,8 +684,8 @@@ } void processInputBuffer(redisClient *c) { + int seeknewline = 0; + again: /* Before to process the input buffer, make sure the client is not * waitig for a blocking operation such as BLPOP. Note that the first @@@ -510,15 -674,19 +694,19 @@@ * in the input buffer the client may be blocked, and the "goto again" * will try to reiterate. The following line will make it return asap. */ if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; + + if (seeknewline && c->bulklen == -1) c->newline = strchr(c->querybuf,'\n'); + seeknewline = 1; if (c->bulklen == -1) { /* Read the first line of the query */ - char *p = strchr(c->querybuf,'\n'); size_t querylen; - if (p) { + if (c->newline) { + char *p = c->newline; sds query, *argv; int argc, j; + c->newline = NULL; query = c->querybuf; c->querybuf = sdsempty(); querylen = 1+(p-(query)); @@@ -605,8 -773,14 +793,14 @@@ void readQueryFromClient(aeEventLoop *e return; } if (nread) { + size_t oldlen = sdslen(c->querybuf); c->querybuf = sdscatlen(c->querybuf, buf, nread); c->lastinteraction = time(NULL); + /* Scan this new piece of the query for the newline. We do this + * here in order to make sure we perform this scan just one time + * per piece of buffer, leading to an O(N) scan instead of O(N*N) */ + if (c->bulklen == -1 && c->newline == NULL) + c->newline = strchr(c->querybuf+oldlen,'\n'); } else { return; } diff --combined src/rdb.c index 3fa284e1,a401a5b9..589b536a --- a/src/rdb.c +++ b/src/rdb.c @@@ -260,17 -260,29 +260,29 @@@ int rdbSaveObject(FILE *fp, robj *o) } } else if (o->type == REDIS_SET) { /* Save a set value */ - dict *set = o->ptr; - dictIterator *di = dictGetIterator(set); - dictEntry *de; - - if (rdbSaveLen(fp,dictSize(set)) == -1) return -1; - while((de = dictNext(di)) != NULL) { - robj *eleobj = dictGetEntryKey(de); + if (o->encoding == REDIS_ENCODING_HT) { + dict *set = o->ptr; + dictIterator *di = dictGetIterator(set); + dictEntry *de; - if (rdbSaveStringObject(fp,eleobj) == -1) return -1; + if (rdbSaveLen(fp,dictSize(set)) == -1) return -1; + while((de = dictNext(di)) != NULL) { + robj *eleobj = dictGetEntryKey(de); + if (rdbSaveStringObject(fp,eleobj) == -1) return -1; + } + dictReleaseIterator(di); + } else if (o->encoding == REDIS_ENCODING_INTSET) { + intset *is = o->ptr; + int64_t llval; + int i = 0; + + if (rdbSaveLen(fp,intsetLen(is)) == -1) return -1; + while(intsetGet(is,i++,&llval)) { + if (rdbSaveLongLongAsStringObject(fp,llval) == -1) return -1; + } + } else { + redisPanic("Unknown set encoding"); } - dictReleaseIterator(di); } else if (o->type == REDIS_ZSET) { /* Save a set value */ zset *zs = o->ptr; @@@ -445,11 -457,11 +457,12 @@@ int rdbSaveBackground(char *filename) if (server.bgsavechildpid != -1) return REDIS_ERR; if (server.vm_enabled) waitEmptyIOJobsQueue(); + server.dirty_before_bgsave = server.dirty; if ((childpid = fork()) == 0) { /* Child */ if (server.vm_enabled) vmReopenSwapFile(); - close(server.fd); + if (server.ipfd > 0) close(server.ipfd); + if (server.sofd > 0) close(server.sofd); if (rdbSave(filename) == REDIS_OK) { _exit(0); } else { @@@ -629,6 -641,7 +642,7 @@@ int rdbLoadDoubleValue(FILE *fp, doubl robj *rdbLoadObject(int type, FILE *fp) { robj *o, *ele, *dec; size_t len; + unsigned int i; redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp)); if (type == REDIS_STRING) { @@@ -670,16 -683,41 +684,41 @@@ } else if (type == REDIS_SET) { /* Read list/set value */ if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; - o = createSetObject(); - /* It's faster to expand the dict to the right size asap in order - * to avoid rehashing */ - if (len > DICT_HT_INITIAL_SIZE) - dictExpand(o->ptr,len); + + /* Use a regular set when there are too many entries. */ + if (len > server.set_max_intset_entries) { + o = createSetObject(); + /* It's faster to expand the dict to the right size asap in order + * to avoid rehashing */ + if (len > DICT_HT_INITIAL_SIZE) + dictExpand(o->ptr,len); + } else { + o = createIntsetObject(); + } + /* Load every single element of the list/set */ - while(len--) { + for (i = 0; i < len; i++) { + long long llval; if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; ele = tryObjectEncoding(ele); - dictAdd((dict*)o->ptr,ele,NULL); + + if (o->encoding == REDIS_ENCODING_INTSET) { + /* Fetch integer value from element */ + if (isObjectRepresentableAsLongLong(ele,&llval) == REDIS_OK) { + o->ptr = intsetAdd(o->ptr,llval,NULL); + } else { + setTypeConvert(o,REDIS_ENCODING_HT); + dictExpand(o->ptr,len); + } + } + + /* This will also be called when the set was just converted + * to regular hashtable encoded set */ + if (o->encoding == REDIS_ENCODING_HT) { + dictAdd((dict*)o->ptr,ele,NULL); + } else { + decrRefCount(ele); + } } } else if (type == REDIS_ZSET) { /* Read list/set value */ @@@ -692,13 -730,14 +731,14 @@@ /* Load every single element of the list/set */ while(zsetlen--) { robj *ele; - double *score = zmalloc(sizeof(double)); + double score; + zskiplistNode *znode; if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; ele = tryObjectEncoding(ele); - if (rdbLoadDoubleValue(fp,score) == -1) return NULL; - dictAdd(zs->dict,ele,score); - zslInsert(zs->zsl,*score,ele); + if (rdbLoadDoubleValue(fp,&score) == -1) return NULL; + znode = zslInsert(zs->zsl,score,ele); + dictAdd(zs->dict,ele,&znode->score); incrRefCount(ele); /* added to skiplist */ } } else if (type == REDIS_HASH) { @@@ -876,7 -915,7 +916,7 @@@ void backgroundSaveDoneHandler(int stat if (!bysignal && exitcode == 0) { redisLog(REDIS_NOTICE, "Background saving terminated with success"); - server.dirty = 0; + server.dirty = server.dirty - server.dirty_before_bgsave; server.lastsave = time(NULL); } else if (!bysignal && exitcode != 0) { redisLog(REDIS_WARNING, "Background saving error"); diff --combined src/redis-benchmark.c index b3e729f1,297ecc6c..68c46ad8 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@@ -71,11 -71,11 +71,12 @@@ static struct config aeEventLoop *el; char *hostip; int hostport; + char *hostsocket; int keepalive; long long start; long long totlatency; int *latency; + char *title; list *clients; int quiet; int loop; @@@ -207,16 -207,27 +208,27 @@@ static void clientDone(client c) } } + /* Read a length from the buffer pointed to by *p, store the length in *len, + * and return the number of bytes that the cursor advanced. */ + static int readLen(char *p, int *len) { + char *tail = strstr(p,"\r\n"); + if (tail == NULL) + return 0; + *tail = '\0'; + *len = atoi(p+1); + return tail+2-p; + } + static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - char buf[1024]; - int nread; + char buf[1024], *p; + int nread, pos=0, len=0; client c = privdata; REDIS_NOTUSED(el); REDIS_NOTUSED(fd); REDIS_NOTUSED(mask); - nread = read(c->fd, buf, 1024); + nread = read(c->fd,buf,sizeof(buf)); if (nread == -1) { fprintf(stderr, "Reading from socket: %s\n", strerror(errno)); freeClient(c); @@@ -229,82 -240,89 +241,89 @@@ } c->totreceived += nread; c->ibuf = sdscatlen(c->ibuf,buf,nread); + len = sdslen(c->ibuf); - processdata: - /* Are we waiting for the first line of the command of for sdf - * count in bulk or multi bulk operations? */ if (c->replytype == REPLY_INT || - c->replytype == REPLY_RETCODE || - (c->replytype == REPLY_BULK && c->readlen == -1) || - (c->replytype == REPLY_MBULK && c->readlen == -1) || - (c->replytype == REPLY_MBULK && c->mbulk == -1)) { - char *p; - - /* Check if the first line is complete. This is only true if - * there is a newline inside the buffer. */ - if ((p = strchr(c->ibuf,'\n')) != NULL) { - if (c->replytype == REPLY_BULK || - (c->replytype == REPLY_MBULK && c->mbulk != -1)) - { - /* Read the count of a bulk reply (being it a single bulk or - * a multi bulk reply). "$" for the protocol spec. */ - *p = '\0'; - *(p-1) = '\0'; - c->readlen = atoi(c->ibuf+1)+2; - // printf("BULK ATOI: %s\n", c->ibuf+1); - /* Handle null bulk reply "$-1" */ - if (c->readlen-2 == -1) { - clientDone(c); - return; - } - /* Leave all the rest in the input buffer */ - c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1); - /* fall through to reach the point where the code will try - * to check if the bulk reply is complete. */ - } else if (c->replytype == REPLY_MBULK && c->mbulk == -1) { - /* Read the count of a multi bulk reply. That is, how many - * bulk replies we have to read next. "*" protocol. */ - *p = '\0'; - *(p-1) = '\0'; - c->mbulk = atoi(c->ibuf+1); - /* Handle null bulk reply "*-1" */ - if (c->mbulk == -1) { - clientDone(c); - return; + c->replytype == REPLY_RETCODE) + { + /* Check if the first line is complete. This is everything we need + * when waiting for an integer or status code reply.*/ + if ((p = strstr(c->ibuf,"\r\n")) != NULL) + goto done; + } else if (c->replytype == REPLY_BULK) { + int advance = 0; + if (c->readlen < 0) { + advance = readLen(c->ibuf+pos,&c->readlen); + if (advance) { + pos += advance; + if (c->readlen == -1) { + goto done; + } else { + /* include the trailing \r\n */ + c->readlen += 2; } - // printf("%p) %d elements list\n", c, c->mbulk); - /* Leave all the rest in the input buffer */ - c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1); - goto processdata; } else { - c->ibuf = sdstrim(c->ibuf,"\r\n"); - clientDone(c); - return; + goto skip; } } - } - /* bulk read, did we read everything? */ - if (((c->replytype == REPLY_MBULK && c->mbulk != -1) || - (c->replytype == REPLY_BULK)) && c->readlen != -1 && - (unsigned)c->readlen <= sdslen(c->ibuf)) - { - // printf("BULKSTATUS mbulk:%d readlen:%d sdslen:%d\n", - // c->mbulk,c->readlen,sdslen(c->ibuf)); - if (c->replytype == REPLY_BULK) { - clientDone(c); - } else if (c->replytype == REPLY_MBULK) { - // printf("%p) %d (%d)) ",c, c->mbulk, c->readlen); - // fwrite(c->ibuf,c->readlen,1,stdout); - // printf("\n"); - if (--c->mbulk == 0) { - clientDone(c); + + int canconsume; + if (c->readlen > 0) { + canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen; + c->readlen -= canconsume; + pos += canconsume; + } + + if (c->readlen == 0) + goto done; + } else if (c->replytype == REPLY_MBULK) { + int advance = 0; + if (c->mbulk == -1) { + advance = readLen(c->ibuf+pos,&c->mbulk); + if (advance) { + pos += advance; + if (c->mbulk == -1) + goto done; + } else { + goto skip; + } + } + + int canconsume; + while(c->mbulk > 0 && pos < len) { + if (c->readlen > 0) { + canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen; + c->readlen -= canconsume; + pos += canconsume; + if (c->readlen == 0) + c->mbulk--; } else { - c->ibuf = sdsrange(c->ibuf,c->readlen,-1); - c->readlen = -1; - goto processdata; + advance = readLen(c->ibuf+pos,&c->readlen); + if (advance) { + pos += advance; + if (c->readlen == -1) { + c->mbulk--; + continue; + } else { + /* include the trailing \r\n */ + c->readlen += 2; + } + } else { + goto skip; + } } } + + if (c->mbulk == 0) + goto done; } + + skip: + c->ibuf = sdsrange(c->ibuf,pos,-1); + return; + done: + clientDone(c); + return; } static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) @@@ -341,11 -359,7 +360,11 @@@ static client createClient(void) client c = zmalloc(sizeof(struct _client)); char err[ANET_ERR_LEN]; - c->fd = anetTcpNonBlockConnect(err,config.hostip,config.hostport); + if (config.hostsocket == NULL) + c->fd = anetTcpNonBlockConnect(err,config.hostip,config.hostport); + else + c->fd = anetUnixNonBlockConnect(err,config.hostsocket); + if (c->fd == ANET_ERR) { zfree(c); fprintf(stderr,"Connect: %s\n",err); @@@ -376,13 -390,13 +395,13 @@@ static void createMissingClients(clien } } - static void showLatencyReport(char *title) { + static void showLatencyReport(void) { int j, seen = 0; float perc, reqpersec; reqpersec = (float)config.donerequests/((float)config.totlatency/1000); if (!config.quiet) { - printf("====== %s ======\n", title); + printf("====== %s ======\n", config.title); printf(" %d requests completed in %.2f seconds\n", config.donerequests, (float)config.totlatency/1000); printf(" %d parallel clients\n", config.numclients); @@@ -398,20 -412,20 +417,20 @@@ } printf("%.2f requests per second\n\n", reqpersec); } else { - printf("%s: %.2f requests per second\n", title, reqpersec); + printf("%s: %.2f requests per second\n", config.title, reqpersec); } } - static void prepareForBenchmark(void) - { + static void prepareForBenchmark(char *title) { memset(config.latency,0,sizeof(int)*(MAX_LATENCY+1)); + config.title = title; config.start = mstime(); config.donerequests = 0; } - static void endBenchmark(char *title) { + static void endBenchmark(void) { config.totlatency = mstime()-config.start; - showLatencyReport(title); + showLatencyReport(); freeAllClients(); } @@@ -441,9 -455,6 +460,9 @@@ void parseOptions(int argc, char **argv } else if (!strcmp(argv[i],"-p") && !lastarg) { config.hostport = atoi(argv[i+1]); i++; + } else if (!strcmp(argv[i],"-s") && !lastarg) { + config.hostsocket = argv[i+1]; + i++; } else if (!strcmp(argv[i],"-d") && !lastarg) { config.datasize = atoi(argv[i+1]); i++; @@@ -467,8 -478,7 +486,8 @@@ printf("Wrong option '%s' or option argument missing\n\n",argv[i]); printf("Usage: redis-benchmark [-h ] [-p ] [-c ] [-n [-k ]\n\n"); printf(" -h Server hostname (default 127.0.0.1)\n"); - printf(" -p Server port (default 6379)\n"); + printf(" -p Server port (default 6379)\n"); + printf(" -s Server socket (overrides host and port)\n"); printf(" -c Number of parallel connections (default 50)\n"); printf(" -n Total number of requests (default 10000)\n"); printf(" -d Data size of SET/GET value in bytes (default 2)\n"); @@@ -489,6 -499,18 +508,18 @@@ } } + int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) { + REDIS_NOTUSED(eventLoop); + REDIS_NOTUSED(id); + REDIS_NOTUSED(clientData); + + float dt = (float)(mstime()-config.start)/1000.0; + float rps = (float)config.donerequests/dt; + printf("%s: %.2f\r", config.title, rps); + fflush(stdout); + return 250; /* every 250ms */ + } + int main(int argc, char **argv) { client c; @@@ -500,6 -522,7 +531,7 @@@ config.requests = 10000; config.liveclients = 0; config.el = aeCreateEventLoop(); + aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL); config.keepalive = 1; config.donerequests = 0; config.datasize = 3; @@@ -514,7 -537,6 +546,7 @@@ config.hostip = "127.0.0.1"; config.hostport = 6379; + config.hostsocket = NULL; parseOptions(argc,argv); @@@ -524,7 -546,7 +556,7 @@@ if (config.idlemode) { printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients); - prepareForBenchmark(); + prepareForBenchmark("IDLE"); c = createClient(); if (!c) exit(1); c->obuf = sdsempty(); @@@ -535,25 -557,25 +567,25 @@@ } do { - prepareForBenchmark(); + prepareForBenchmark("PING"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"PING\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("PING"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("PING (multi bulk)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"*1\r\n$4\r\nPING\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("PING (multi bulk)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("SET"); c = createClient(); if (!c) exit(1); c->obuf = sdscatprintf(c->obuf,"SET foo_rand000000000000 %d\r\n",config.datasize); @@@ -567,106 -589,106 +599,106 @@@ prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("SET"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("GET"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"GET foo_rand000000000000\r\n"); prepareClientForReply(c,REPLY_BULK); createMissingClients(c); aeMain(config.el); - endBenchmark("GET"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("INCR"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"INCR counter_rand000000000000\r\n"); prepareClientForReply(c,REPLY_INT); createMissingClients(c); aeMain(config.el); - endBenchmark("INCR"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LPUSH"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n"); prepareClientForReply(c,REPLY_INT); createMissingClients(c); aeMain(config.el); - endBenchmark("LPUSH"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LPOP"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LPOP mylist\r\n"); prepareClientForReply(c,REPLY_BULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LPOP"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("SADD"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"SADD myset 24\r\ncounter_rand000000000000\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("SADD"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("SPOP"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"SPOP myset\r\n"); prepareClientForReply(c,REPLY_BULK); createMissingClients(c); aeMain(config.el); - endBenchmark("SPOP"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LPUSH (again, in order to bench LRANGE)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n"); prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); - endBenchmark("LPUSH (again, in order to bench LRANGE)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LRANGE (first 100 elements)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LRANGE mylist 0 99\r\n"); prepareClientForReply(c,REPLY_MBULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LRANGE (first 100 elements)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LRANGE (first 300 elements)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LRANGE mylist 0 299\r\n"); prepareClientForReply(c,REPLY_MBULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LRANGE (first 300 elements)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LRANGE (first 450 elements)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LRANGE mylist 0 449\r\n"); prepareClientForReply(c,REPLY_MBULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LRANGE (first 450 elements)"); + endBenchmark(); - prepareForBenchmark(); + prepareForBenchmark("LRANGE (first 600 elements)"); c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LRANGE mylist 0 599\r\n"); prepareClientForReply(c,REPLY_MBULK); createMissingClients(c); aeMain(config.el); - endBenchmark("LRANGE (first 600 elements)"); + endBenchmark(); printf("\n"); } while(config.loop); diff --combined src/redis-cli.c index 4dafba32,5071604b..8866678b --- a/src/redis-cli.c +++ b/src/redis-cli.c @@@ -36,6 -36,8 +36,8 @@@ #include #include #include + #include + #include #include "anet.h" #include "sds.h" @@@ -52,15 -54,16 +54,17 @@@ static struct config { char *hostip; int hostport; + char *hostsocket; long repeat; int dbnum; - int argn_from_stdin; int interactive; int shutdown; int monitor_mode; int pubsub_mode; - int raw_output; + int raw_output; /* output mode per command */ + int tty; /* flag for default output format */ + int stdinarg; /* get last arg from stdin. (-x option) */ + char mb_sep; char *auth; char *historyfile; } config; @@@ -68,26 -71,17 +72,25 @@@ static int cliReadReply(int fd); static void usage(); - static int cliConnect(void) { + /* Connect to the client. If force is not zero the connection is performed + * even if there is already a connected socket. */ + static int cliConnect(int force) { char err[ANET_ERR_LEN]; static int fd = ANET_ERR; - if (fd == ANET_ERR) { + if (fd == ANET_ERR || force) { + if (force) close(fd); - fd = anetTcpConnect(err,config.hostip,config.hostport); + if (config.hostsocket == NULL) { + fd = anetTcpConnect(err,config.hostip,config.hostport); + } else { + fd = anetUnixConnect(err,config.hostsocket); - if (fd == ANET_ERR) { - fprintf(stderr, "Could not connect to Redis at %s: %s", config.hostsocket, err); - return -1; - } + } if (fd == ANET_ERR) { - fprintf(stderr, "Could not connect to Redis at %s:%d: %s", config.hostip, config.hostport, err); + fprintf(stderr,"Could not connect to Redis at "); + if (config.hostsocket == NULL) + fprintf(stderr,"%s:%d: %s",config.hostip,config.hostport,err); + else + fprintf(stderr,"%s: %s",config.hostsocket,err); return -1; } anetTcpNoDelay(NULL,fd); @@@ -103,7 -97,7 +106,7 @@@ static sds cliReadLine(int fd) ssize_t ret; ret = read(fd,&c,1); - if (ret == -1) { + if (ret <= 0) { sdsfree(line); return NULL; } else if ((ret == 0) || (c == '\n')) { @@@ -120,7 -114,7 +123,7 @@@ static int cliReadSingleLineReply(int f if (reply == NULL) return 1; if (!quiet) - printf("%s\n", reply); + printf("%s", reply); sdsfree(reply); return 0; } @@@ -147,7 -141,7 +150,7 @@@ static void printStringRepr(char *s, in } s++; } - printf("\"\n"); + printf("\""); } static int cliReadBulkReply(int fd) { @@@ -165,7 -159,7 +168,7 @@@ reply = zmalloc(bulklen); anetRead(fd,reply,bulklen); anetRead(fd,crlf,2); - if (config.raw_output || !isatty(fileno(stdout))) { + if (config.raw_output || !config.tty) { if (bulklen && fwrite(reply,bulklen,1,stdout) == 0) { zfree(reply); return 1; @@@ -182,6 -176,7 +185,7 @@@ static int cliReadMultiBulkReply(int fd) { sds replylen = cliReadLine(fd); int elements, c = 1; + int retval = 0; if (replylen == NULL) return 1; elements = atoi(replylen); @@@ -194,36 -189,45 +198,45 @@@ printf("(empty list or set)\n"); } while(elements--) { - printf("%d. ", c); - if (cliReadReply(fd)) return 1; + if (config.tty) printf("%d. ", c); + if (cliReadReply(fd)) retval = 1; + if (elements) printf("%c",config.mb_sep); c++; } - return 0; + return retval; } static int cliReadReply(int fd) { char type; + int nread; - if (anetRead(fd,&type,1) <= 0) { + if ((nread = anetRead(fd,&type,1)) <= 0) { if (config.shutdown) return 0; - exit(1); + if (config.interactive && + (nread == 0 || (nread == -1 && errno == ECONNRESET))) + { + return ECONNRESET; + } else { + printf("I/O error while reading from socket: %s",strerror(errno)); + exit(1); + } } switch(type) { case '-': - printf("(error) "); + if (config.tty) printf("(error) "); cliReadSingleLineReply(fd,0); return 1; case '+': return cliReadSingleLineReply(fd,0); case ':': - printf("(integer) "); + if (config.tty) printf("(integer) "); return cliReadSingleLineReply(fd,0); case '$': return cliReadBulkReply(fd); case '*': return cliReadMultiBulkReply(fd); default: - printf("protocol error, got '%c' as reply type byte\n", type); + printf("protocol error, got '%c' as reply type byte", type); return 1; } } @@@ -248,17 -252,37 +261,37 @@@ static int selectDb(int fd) return 0; } + static void showInteractiveHelp(void) { + printf( + "\n" + "Welcome to redis-cli " REDIS_VERSION "!\n" + "Just type any valid Redis command to see a pretty printed output.\n" + "\n" + "It is possible to quote strings, like in:\n" + " set \"my key\" \"some string \\xff\\n\"\n" + "\n" + "You can find a list of valid Redis commands at\n" + " http://code.google.com/p/redis/wiki/CommandReference\n" + "\n" + "Note: redis-cli supports line editing, use up/down arrows for history." + "\n\n"); + } + static int cliSendCommand(int argc, char **argv, int repeat) { char *command = argv[0]; int fd, j, retval = 0; sds cmd; config.raw_output = !strcasecmp(command,"info"); + if (!strcasecmp(command,"help")) { + showInteractiveHelp(); + return 0; + } if (!strcasecmp(command,"shutdown")) config.shutdown = 1; if (!strcasecmp(command,"monitor")) config.monitor_mode = 1; if (!strcasecmp(command,"subscribe") || !strcasecmp(command,"psubscribe")) config.pubsub_mode = 1; - if ((fd = cliConnect()) == -1) return 1; + if ((fd = cliConnect(0)) == -1) return 1; /* Select db number */ retval = selectDb(fd); @@@ -279,21 -303,21 +312,21 @@@ while(repeat--) { anetWrite(fd,cmd,sdslen(cmd)); while (config.monitor_mode) { - cliReadSingleLineReply(fd,0); + if (cliReadSingleLineReply(fd,0)) exit(1); + printf("\n"); } if (config.pubsub_mode) { printf("Reading messages... (press Ctrl-c to quit)\n"); while (1) { cliReadReply(fd); - printf("\n"); + printf("\n\n"); } } retval = cliReadReply(fd); - if (retval) { - return retval; - } + if (!config.raw_output && config.tty) printf("\n"); + if (retval) return retval; } return 0; } @@@ -314,12 -338,11 +347,14 @@@ static int parseOptions(int argc, char i++; } else if (!strcmp(argv[i],"-h") && lastarg) { usage(); + } else if (!strcmp(argv[i],"-x")) { + config.stdinarg = 1; } else if (!strcmp(argv[i],"-p") && !lastarg) { config.hostport = atoi(argv[i+1]); i++; + } else if (!strcmp(argv[i],"-s") && !lastarg) { + config.hostsocket = argv[i+1]; + i++; } else if (!strcmp(argv[i],"-r") && !lastarg) { config.repeat = strtoll(argv[i+1],NULL,10); i++; @@@ -330,11 -353,18 +365,18 @@@ config.auth = argv[i+1]; i++; } else if (!strcmp(argv[i],"-i")) { - config.interactive = 1; + fprintf(stderr, + "Starting interactive mode using -i is deprecated. Interactive mode is started\n" + "by default when redis-cli is executed without a command to execute.\n" + ); } else if (!strcmp(argv[i],"-c")) { - config.argn_from_stdin = 1; + fprintf(stderr, + "Reading last argument from standard input using -c is deprecated.\n" + "When standard input is connected to a pipe or regular file, it is\n" + "automatically used as last argument.\n" + ); } else if (!strcmp(argv[i],"-v")) { - printf("redis-cli shipped with Redis verison %s\n", REDIS_VERSION); + printf("redis-cli shipped with Redis version %s\n", REDIS_VERSION); exit(0); } else { break; @@@ -361,10 -391,9 +403,9 @@@ static sds readArgFromStdin(void) } static void usage() { - fprintf(stderr, "usage: redis-cli [-iv] [-h host] [-p port] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 arg3 ... argN\n"); + fprintf(stderr, "usage: redis-cli [-iv] [-h host] [-p port] [-s /path/to/socket] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 arg3 ... argN\n"); - fprintf(stderr, "usage: echo \"argN\" | redis-cli -c [-h host] [-p port] [-s /path/to/socket] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 ... arg(N-1)\n"); - fprintf(stderr, "\nIf a pipe from standard input is detected this data is used as last argument.\n\n"); - fprintf(stderr, "example: cat /etc/passwd | redis-cli set my_passwd\n"); + fprintf(stderr, "usage: echo \"argN\" | redis-cli -x [options] cmd arg1 arg2 ... arg(N-1)\n\n"); + fprintf(stderr, "example: cat /etc/passwd | redis-cli -x set my_passwd\n"); fprintf(stderr, "example: redis-cli get my_passwd\n"); fprintf(stderr, "example: redis-cli -r 100 lpush mylist x\n"); fprintf(stderr, "\nRun in interactive mode: redis-cli -i or just don't pass any command\n"); @@@ -382,87 -411,39 +423,39 @@@ static char **convertToSds(int count, c return sds; } - static char **splitArguments(char *line, int *argc) { - char *p = line; - char *current = NULL; - char **vector = NULL; - - *argc = 0; - while(1) { - /* skip blanks */ - while(*p && isspace(*p)) p++; - if (*p) { - /* get a token */ - int inq=0; /* set to 1 if we are in "quotes" */ - int done = 0; - - if (current == NULL) current = sdsempty(); - while(!done) { - if (inq) { - if (*p == '\\' && *(p+1)) { - char c; - - p++; - switch(*p) { - case 'n': c = '\n'; break; - case 'r': c = '\r'; break; - case 't': c = '\t'; break; - case 'b': c = '\b'; break; - case 'a': c = '\a'; break; - default: c = *p; break; - } - current = sdscatlen(current,&c,1); - } else if (*p == '"') { - done = 1; - } else { - current = sdscatlen(current,p,1); - } - } else { - switch(*p) { - case ' ': - case '\n': - case '\r': - case '\t': - case '\0': - done=1; - break; - case '"': - inq=1; - break; - default: - current = sdscatlen(current,p,1); - break; - } - } - if (*p) p++; - } - /* add the token to the vector */ - vector = zrealloc(vector,((*argc)+1)*sizeof(char*)); - vector[*argc] = current; - (*argc)++; - current = NULL; - } else { - return vector; - } - } - } - #define LINE_BUFLEN 4096 static void repl() { int argc, j; - char *line, **argv; + char *line; + sds *argv; + config.interactive = 1; while((line = linenoise("redis> ")) != NULL) { if (line[0] != '\0') { - argv = splitArguments(line,&argc); + argv = sdssplitargs(line,&argc); linenoiseHistoryAdd(line); if (config.historyfile) linenoiseHistorySave(config.historyfile); - if (argc > 0) { + if (argv == NULL) { + printf("Invalid argument(s)\n"); + continue; + } else if (argc > 0) { if (strcasecmp(argv[0],"quit") == 0 || strcasecmp(argv[0],"exit") == 0) - exit(0); - else - cliSendCommand(argc, argv, 1); + { + exit(0); + } else { + int err; + + if ((err = cliSendCommand(argc, argv, 1)) != 0) { + if (err == ECONNRESET) { + printf("Reconnecting... "); + fflush(stdout); + if (cliConnect(1) == -1) exit(1); + printf("OK\n"); + cliSendCommand(argc,argv,1); + } + } + } } /* Free the argument vector */ for (j = 0; j < argc; j++) @@@ -475,23 -456,36 +468,37 @@@ exit(0); } + static int noninteractive(int argc, char **argv) { + int retval = 0; + if (config.stdinarg) { + argv = zrealloc(argv, (argc+1)*sizeof(char*)); + argv[argc] = readArgFromStdin(); + retval = cliSendCommand(argc+1, argv, config.repeat); + } else { + /* stdin is probably a tty, can be tested with S_ISCHR(s.st_mode) */ + retval = cliSendCommand(argc, argv, config.repeat); + } + return retval; + } + int main(int argc, char **argv) { int firstarg; - char **argvcopy; config.hostip = "127.0.0.1"; config.hostport = 6379; + config.hostsocket = NULL; config.repeat = 1; config.dbnum = 0; - config.argn_from_stdin = 0; - config.shutdown = 0; config.interactive = 0; + config.shutdown = 0; config.monitor_mode = 0; config.pubsub_mode = 0; config.raw_output = 0; + config.stdinarg = 0; config.auth = NULL; config.historyfile = NULL; + config.tty = isatty(fileno(stdout)) || (getenv("FAKETTY") != NULL); + config.mb_sep = '\n'; if (getenv("HOME") != NULL) { config.historyfile = malloc(256); @@@ -505,19 -499,20 +512,20 @@@ if (config.auth != NULL) { char *authargv[2]; + int dbnum = config.dbnum; + /* We need to save the real configured database number and set it to + * zero here, otherwise cliSendCommand() will try to perform the + * SELECT command before the authentication, and it will fail. */ + config.dbnum = 0; authargv[0] = "AUTH"; authargv[1] = config.auth; cliSendCommand(2, convertToSds(2, authargv), 1); + config.dbnum = dbnum; /* restore the right DB number */ } - if (argc == 0 || config.interactive == 1) repl(); - - argvcopy = convertToSds(argc+1, argv); - if (config.argn_from_stdin) { - sds lastarg = readArgFromStdin(); - argvcopy[argc] = lastarg; - argc++; - } - return cliSendCommand(argc, argvcopy, config.repeat); + /* Start interactive mode when no command is provided */ + if (argc == 0) repl(); + /* Otherwise, we have some arguments to execute */ + return noninteractive(argc,convertToSds(argc,argv)); } diff --combined src/redis.c index 0e9b73b7,27a855d9..50cf2f6c --- a/src/redis.c +++ b/src/redis.c @@@ -51,6 -51,7 +51,7 @@@ #include #include #include + #include /* Our shared "common" objects */ @@@ -170,6 -171,7 +171,7 @@@ struct redisCommand readonlyCommandTabl {"info",infoCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"monitor",monitorCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, + {"persist",persistCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0}, {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0}, @@@ -338,7 -340,7 +340,7 @@@ dictType zsetDictType = NULL, /* val dup */ dictEncObjKeyCompare, /* key compare */ dictRedisObjectDestructor, /* key destructor */ - dictVanillaFree /* val destructor of malloc(sizeof(double)) */ + NULL /* val destructor */ }; /* Db->dict, keys are sds strings, vals are Redis objects. */ @@@ -435,6 -437,48 +437,48 @@@ void updateDictResizePolicy(void) /* ======================= Cron: called every 100 ms ======================== */ + /* Try to expire a few timed out keys. The algorithm used is adaptive and + * will use few CPU cycles if there are few expiring keys, otherwise + * it will get more aggressive to avoid that too much memory is used by + * keys that can be removed from the keyspace. */ + void activeExpireCycle(void) { + int j; + + for (j = 0; j < server.dbnum; j++) { + int expired; + redisDb *db = server.db+j; + + /* Continue to expire if at the end of the cycle more than 25% + * of the keys were expired. */ + do { + long num = dictSize(db->expires); + time_t now = time(NULL); + + expired = 0; + if (num > REDIS_EXPIRELOOKUPS_PER_CRON) + num = REDIS_EXPIRELOOKUPS_PER_CRON; + while (num--) { + dictEntry *de; + time_t t; + + if ((de = dictGetRandomKey(db->expires)) == NULL) break; + t = (time_t) dictGetEntryVal(de); + if (now > t) { + sds key = dictGetEntryKey(de); + robj *keyobj = createStringObject(key,sdslen(key)); + + propagateExpire(db,keyobj); + dbDelete(db,keyobj); + decrRefCount(keyobj); + expired++; + server.stat_expiredkeys++; + } + } + } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); + } + } + + int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j, loops = server.cronloops++; REDIS_NOTUSED(eventLoop); @@@ -533,41 -577,10 +577,10 @@@ } } - /* Try to expire a few timed out keys. The algorithm used is adaptive and - * will use few CPU cycles if there are few expiring keys, otherwise - * it will get more aggressive to avoid that too much memory is used by - * keys that can be removed from the keyspace. */ - for (j = 0; j < server.dbnum; j++) { - int expired; - redisDb *db = server.db+j; - - /* Continue to expire if at the end of the cycle more than 25% - * of the keys were expired. */ - do { - long num = dictSize(db->expires); - time_t now = time(NULL); - - expired = 0; - if (num > REDIS_EXPIRELOOKUPS_PER_CRON) - num = REDIS_EXPIRELOOKUPS_PER_CRON; - while (num--) { - dictEntry *de; - time_t t; - - if ((de = dictGetRandomKey(db->expires)) == NULL) break; - t = (time_t) dictGetEntryVal(de); - if (now > t) { - sds key = dictGetEntryKey(de); - robj *keyobj = createStringObject(key,sdslen(key)); - - dbDelete(db,keyobj); - decrRefCount(keyobj); - expired++; - server.stat_expiredkeys++; - } - } - } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); - } + /* Expire a few keys per cycle, only if this is a master. + * On slaves we wait for DEL operations synthesized by the master + * in order to guarantee a strict consistency. */ + if (server.masterhost == NULL) activeExpireCycle(); /* Swap a few keys on disk if we are over the memory limit and VM * is enbled. Try to free objects from the free list first. */ @@@ -696,16 -709,13 +709,16 @@@ void createSharedObjects(void) } void initServerConfig() { - server.dbnum = REDIS_DEFAULT_DBNUM; server.port = REDIS_SERVERPORT; + server.bindaddr = NULL; + server.unixsocket = NULL; + server.ipfd = -1; + server.sofd = -1; + server.dbnum = REDIS_DEFAULT_DBNUM; server.verbosity = REDIS_VERBOSE; server.maxidletime = REDIS_MAXIDLETIME; server.saveparams = NULL; server.logfile = NULL; /* NULL = log on standard output */ - server.bindaddr = NULL; server.glueoutputbuf = 1; server.daemonize = 0; server.appendonly = 0; @@@ -734,6 -744,7 +747,7 @@@ server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE; server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES; server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE; + server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES; server.shutdown_asap = 0; resetServerSaveParams(); @@@ -776,21 -787,9 +790,21 @@@ void initServer() createSharedObjects(); server.el = aeCreateEventLoop(); server.db = zmalloc(sizeof(redisDb)*server.dbnum); - server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr); - if (server.fd == -1) { - redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr); + server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr); + if (server.ipfd == ANET_ERR) { + redisLog(REDIS_WARNING, "Opening port: %s", server.neterr); + exit(1); + } + if (server.unixsocket != NULL) { + unlink(server.unixsocket); /* don't care if this fails */ + server.sofd = anetUnixServer(server.neterr,server.unixsocket); + if (server.sofd == ANET_ERR) { + redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr); + exit(1); + } + } + if (server.ipfd < 0 && server.sofd < 0) { + redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } for (j = 0; j < server.dbnum; j++) { @@@ -819,10 -818,8 +833,10 @@@ server.stat_starttime = time(NULL); server.unixtime = time(NULL); aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); - if (aeCreateFileEvent(server.el, server.fd, AE_READABLE, - acceptHandler, NULL) == AE_ERR) oom("creating file event"); + if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, + acceptTcpHandler,NULL) == AE_ERR) oom("creating file event"); + if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, + acceptUnixHandler,NULL) == AE_ERR) oom("creating file event"); if (server.appendonly) { server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); @@@ -892,9 -889,6 +906,6 @@@ void call(redisClient *c, struct redisC int processCommand(redisClient *c) { struct redisCommand *cmd; - /* Free some memory if needed (maxmemory setting) */ - if (server.maxmemory) freeMemoryIfNeeded(); - /* Handle the multi bulk command type. This is an alternative protocol * supported by Redis in order to receive commands that are composed of * multiple binary-safe "bulk" arguments. The latency of processing is @@@ -913,15 -907,20 +924,20 @@@ } else if (c->multibulk) { if (c->bulklen == -1) { if (((char*)c->argv[0]->ptr)[0] != '$') { - addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n")); + addReplyError(c,"multi bulk protocol error"); resetClient(c); return 1; } else { - int bulklen = atoi(((char*)c->argv[0]->ptr)+1); + char *eptr; + long bulklen = strtol(((char*)c->argv[0]->ptr)+1,&eptr,10); + int perr = eptr[0] != '\0'; + decrRefCount(c->argv[0]); - if (bulklen < 0 || bulklen > 1024*1024*1024) { + if (perr || bulklen == LONG_MIN || bulklen == LONG_MAX || + bulklen < 0 || bulklen > 1024*1024*1024) + { c->argc--; - addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); + addReplyError(c,"invalid bulk write count"); resetClient(c); return 1; } @@@ -974,27 -973,28 +990,28 @@@ * such wrong arity, bad command name and so forth. */ cmd = lookupCommand(c->argv[0]->ptr); if (!cmd) { - addReplySds(c, - sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n", - (char*)c->argv[0]->ptr)); + addReplyErrorFormat(c,"unknown command '%s'", + (char*)c->argv[0]->ptr); resetClient(c); return 1; } else if ((cmd->arity > 0 && cmd->arity != c->argc) || (c->argc < -cmd->arity)) { - addReplySds(c, - sdscatprintf(sdsempty(), - "-ERR wrong number of arguments for '%s' command\r\n", - cmd->name)); + addReplyErrorFormat(c,"wrong number of arguments for '%s' command", + cmd->name); resetClient(c); return 1; } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) { /* This is a bulk command, we have to read the last argument yet. */ - int bulklen = atoi(c->argv[c->argc-1]->ptr); + char *eptr; + long bulklen = strtol(c->argv[c->argc-1]->ptr,&eptr,10); + int perr = eptr[0] != '\0'; decrRefCount(c->argv[c->argc-1]); - if (bulklen < 0 || bulklen > 1024*1024*1024) { + if (perr || bulklen == LONG_MAX || bulklen == LONG_MIN || + bulklen < 0 || bulklen > 1024*1024*1024) + { c->argc--; - addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n")); + addReplyError(c,"invalid bulk write count"); resetClient(c); return 1; } @@@ -1021,16 -1021,21 +1038,21 @@@ /* Check if the user is authenticated */ if (server.requirepass && !c->authenticated && cmd->proc != authCommand) { - addReplySds(c,sdsnew("-ERR operation not permitted\r\n")); + addReplyError(c,"operation not permitted"); resetClient(c); return 1; } - /* Handle the maxmemory directive */ + /* Handle the maxmemory directive. + * + * First we try to free some memory if possible (if there are volatile + * keys in the dataset). If there are not the only thing we can do + * is returning an error. */ + if (server.maxmemory) freeMemoryIfNeeded(); if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) && zmalloc_used_memory() > server.maxmemory) { - addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n")); + addReplyError(c,"command not allowed when used memory > 'maxmemory'"); resetClient(c); return 1; } @@@ -1040,7 -1045,7 +1062,7 @@@ && cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand && cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) { - addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n")); + addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context"); resetClient(c); return 1; } @@@ -1081,11 -1086,7 +1103,7 @@@ int prepareForShutdown() if (server.vm_enabled) unlink(server.vm_swap_file); } else { /* Snapshotting. Perform a SYNC SAVE and exit */ - if (rdbSave(server.dbfilename) == REDIS_OK) { - if (server.daemonize) - unlink(server.pidfile); - redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); - } else { + if (rdbSave(server.dbfilename) != REDIS_OK) { /* Ooops.. error saving! The best we can do is to continue * operating. Note that if there was a background saving process, * in the next cron() Redis will be notified that the background @@@ -1095,6 -1096,7 +1113,7 @@@ return REDIS_ERR; } } + if (server.daemonize) unlink(server.pidfile); redisLog(REDIS_WARNING,"Server exit now, bye bye..."); return REDIS_OK; } @@@ -1107,7 -1109,7 +1126,7 @@@ void authCommand(redisClient *c) addReply(c,shared.ok); } else { c->authenticated = 0; - addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n")); + addReplyError(c,"invalid password"); } } @@@ -1148,6 -1150,10 +1167,10 @@@ sds genRedisInfoString(void) time_t uptime = time(NULL)-server.stat_starttime; int j; char hmem[64]; + struct rusage self_ru, c_ru; + + getrusage(RUSAGE_SELF, &self_ru); + getrusage(RUSAGE_CHILDREN, &c_ru); bytesToHuman(hmem,zmalloc_used_memory()); info = sdscatprintf(sdsempty(), @@@ -1159,11 -1165,16 +1182,16 @@@ "process_id:%ld\r\n" "uptime_in_seconds:%ld\r\n" "uptime_in_days:%ld\r\n" + "used_cpu_sys:%.2f\r\n" + "used_cpu_user:%.2f\r\n" + "used_cpu_sys_childrens:%.2f\r\n" + "used_cpu_user_childrens:%.2f\r\n" "connected_clients:%d\r\n" "connected_slaves:%d\r\n" "blocked_clients:%d\r\n" "used_memory:%zu\r\n" "used_memory_human:%s\r\n" + "mem_fragmentation_ratio:%.2f\r\n" "changes_since_last_save:%lld\r\n" "bgsave_in_progress:%d\r\n" "last_save_time:%ld\r\n" @@@ -1185,11 -1196,16 +1213,16 @@@ (long) getpid(), uptime, uptime/(3600*24), + (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000, + (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000, + (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000, + (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000, listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), server.blpop_blocked_clients, zmalloc_used_memory(), hmem, + zmalloc_get_fragmentation_ratio(), server.dirty, server.bgsavechildpid != -1, server.lastsave, @@@ -1319,7 -1335,8 +1352,8 @@@ void freeMemoryIfNeeded(void) if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue; for (j = 0; j < server.dbnum; j++) { int minttl = -1; - robj *minkey = NULL; + sds minkey = NULL; + robj *keyobj = NULL; struct dictEntry *de; if (dictSize(server.db[j].expires)) { @@@ -1336,7 -1353,10 +1370,10 @@@ minttl = t; } } - dbDelete(server.db+j,minkey); + keyobj = createStringObject(minkey,sdslen(minkey)); + dbDelete(server.db+j,keyobj); + server.stat_expiredkeys++; + decrRefCount(keyobj); } } if (!freed) return; /* nothing to free... */ @@@ -1367,9 -1387,17 +1404,17 @@@ void linuxOvercommitMemoryWarning(void } #endif /* __linux__ */ + void createPidFile(void) { + /* Try to write the pid file in a best-effort way. */ + FILE *fp = fopen(server.pidfile,"w"); + if (fp) { + fprintf(fp,"%d\n",getpid()); + fclose(fp); + } + } + void daemonize(void) { int fd; - FILE *fp; if (fork() != 0) exit(0); /* parent exits */ setsid(); /* create a new session */ @@@ -1383,12 -1411,6 +1428,6 @@@ dup2(fd, STDERR_FILENO); if (fd > STDERR_FILENO) close(fd); } - /* Try to write the pid file */ - fp = fopen(server.pidfile,"w"); - if (fp) { - fprintf(fp,"%d\n",getpid()); - fclose(fp); - } } void version() { @@@ -1421,6 -1443,7 +1460,7 @@@ int main(int argc, char **argv) } if (server.daemonize) daemonize(); initServer(); + if (server.daemonize) createPidFile(); redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION); #ifdef __linux__ linuxOvercommitMemoryWarning(); @@@ -1433,10 -1456,7 +1473,10 @@@ if (rdbLoad(server.dbfilename) == REDIS_OK) redisLog(REDIS_NOTICE,"DB loaded from disk: %ld seconds",time(NULL)-start); } - redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); + if (server.ipfd > 0) + redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); + if (server.sofd > 0) + redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); aeDeleteEventLoop(server.el); @@@ -1500,6 -1520,7 +1540,7 @@@ void segvHandler(int sig, siginfo_t *in redisLog(REDIS_WARNING,"%s", messages[i]); /* free(messages); Don't call free() with possibly corrupted memory. */ + if (server.daemonize) unlink(server.pidfile); _exit(0); } diff --combined src/redis.h index 38f0c140,3e9fc236..8e05a4d4 --- a/src/redis.h +++ b/src/redis.h @@@ -26,6 -26,7 +26,7 @@@ #include "anet.h" /* Networking the easy way */ #include "zipmap.h" /* Compact string -> string data structure */ #include "ziplist.h" /* Compact list data structure */ + #include "intset.h" /* Compact integer set structure */ #include "version.h" /* Error codes */ @@@ -46,6 -47,7 +47,7 @@@ #define REDIS_MAX_WRITE_PER_EVENT (1024*64) #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */ #define REDIS_SHARED_INTEGERS 10000 + #define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */ /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */ #define REDIS_WRITEV_THRESHOLD 3 @@@ -82,6 -84,7 +84,7 @@@ #define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */ #define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */ #define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ + #define REDIS_ENCODING_INTSET 6 /* Encoded as intset */ /* Object types only used for dumping to disk */ #define REDIS_EXPIRETIME 253 @@@ -188,6 -191,7 +191,7 @@@ #define REDIS_HASH_MAX_ZIPMAP_VALUE 512 #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 1024 #define REDIS_LIST_MAX_ZIPLIST_VALUE 32 + #define REDIS_SET_MAX_INTSET_ENTRIES 4096 /* Sets operations codes */ #define REDIS_OP_UNION 0 @@@ -282,8 -286,9 +286,9 @@@ typedef struct redisClient int dictid; sds querybuf; robj **argv, **mbargv; + char *newline; /* pointing to the detected newline in querybuf */ int argc, mbargc; - int bulklen; /* bulk read len. -1 if not in bulk read mode */ + long bulklen; /* bulk read len. -1 if not in bulk read mode */ int multibulk; /* multi bulk command format active */ list *reply; int sentlen; @@@ -306,6 -311,10 +311,10 @@@ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ + + /* Response buffer */ + int bufpos; + char buf[REDIS_REPLY_CHUNK_BYTES]; } redisClient; struct saveparam { @@@ -329,12 -338,10 +338,13 @@@ struct sharedObjectsStruct struct redisServer { pthread_t mainthread; int port; - int fd; + char *bindaddr; + char *unixsocket; + int ipfd; + int sofd; redisDb *db; long long dirty; /* changes to DB from the last save */ + long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */ list *clients; list *slaves, *monitors; char neterr[ANET_ERR_LEN]; @@@ -368,6 -375,7 +378,6 @@@ struct saveparam *saveparams; int saveparamslen; char *logfile; - char *bindaddr; char *dbfilename; char *appendfilename; char *requirepass; @@@ -400,6 -408,7 +410,7 @@@ size_t hash_max_zipmap_value; size_t list_max_ziplist_entries; size_t list_max_ziplist_value; + size_t set_max_intset_entries; /* Virtual memory state */ FILE *vm_fp; int vm_fd; @@@ -482,13 -491,14 +493,14 @@@ typedef struct _redisSortOperation } redisSortOperation; /* ZSETs use a specialized version of Skiplists */ - typedef struct zskiplistNode { - struct zskiplistNode **forward; - struct zskiplistNode *backward; - unsigned int *span; - double score; robj *obj; + double score; + struct zskiplistNode *backward; + struct zskiplistLevel { + struct zskiplistNode *forward; + unsigned int span; + } level[]; } zskiplistNode; typedef struct zskiplist { @@@ -537,6 -547,14 +549,14 @@@ typedef struct listNode *ln; /* Entry in linked list */ } listTypeEntry; + /* Structure to hold set iteration abstraction. */ + typedef struct { + robj *subject; + int encoding; + int ii; /* intset iterator */ + dictIterator *di; + } setTypeIterator; + /* Structure to hold hash iteration abstration. Note that iteration over * hashes involves both fields and values. Because it is possible that * not both are required, store pointers in the iterator to avoid @@@ -577,21 -595,34 +597,35 @@@ void resetClient(redisClient *c) void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask); void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); + void *addDeferredMultiBulkLength(redisClient *c); + void setDeferredMultiBulkLength(redisClient *c, void *node, long length); void addReplySds(redisClient *c, sds s); void processInputBuffer(redisClient *c); -void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); +void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); +void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); void addReplyBulk(redisClient *c, robj *obj); void addReplyBulkCString(redisClient *c, char *s); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); void addReplySds(redisClient *c, sds s); + void addReplyError(redisClient *c, char *err); + void addReplyStatus(redisClient *c, char *status); void addReplyDouble(redisClient *c, double d); void addReplyLongLong(redisClient *c, long long ll); - void addReplyUlong(redisClient *c, unsigned long ul); + void addReplyMultiBulkLen(redisClient *c, long length); void *dupClientReplyValue(void *o); + #ifdef __GNUC__ + void addReplyErrorFormat(redisClient *c, const char *fmt, ...) + __attribute__((format(printf, 2, 3))); + void addReplyStatusFormat(redisClient *c, const char *fmt, ...) + __attribute__((format(printf, 2, 3))); + #else + void addReplyErrorFormat(redisClient *c, const char *fmt, ...); + void addReplyStatusFormat(redisClient *c, const char *fmt, ...); + #endif + /* List data type */ void listTypeTryConversion(robj *subject, robj *value); void listTypePush(robj *subject, robj *value, int where); @@@ -636,6 -667,7 +670,7 @@@ robj *createStringObjectFromLongLong(lo robj *createListObject(void); robj *createZiplistObject(void); robj *createSetObject(void); + robj *createIntsetObject(void); robj *createHashObject(void); robj *createZsetObject(void); int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg); @@@ -677,7 -709,7 +712,7 @@@ void backgroundRewriteDoneHandler(int s /* Sorted sets data type */ zskiplist *zslCreate(void); void zslFree(zskiplist *zsl); - void zslInsert(zskiplist *zsl, double score, robj *obj); + zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj); /* Core functions */ void freeMemoryIfNeeded(void); @@@ -719,6 -751,18 +754,18 @@@ int dontWaitForSwappedKey(redisClient * void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); vmpointer *vmSwapObjectBlocking(robj *val); + /* Set data type */ + robj *setTypeCreate(robj *value); + int setTypeAdd(robj *subject, robj *value); + int setTypeRemove(robj *subject, robj *value); + int setTypeIsMember(robj *subject, robj *value); + setTypeIterator *setTypeInitIterator(robj *subject); + void setTypeReleaseIterator(setTypeIterator *si); + robj *setTypeNext(setTypeIterator *si); + robj *setTypeRandomElement(robj *subject); + unsigned long setTypeSize(robj *subject); + void setTypeConvert(robj *subject, int enc); + /* Hash data type */ void convertToRealHash(robj *o); void hashTypeTryConversion(robj *subject, robj **argv, int start, int end); @@@ -747,6 -791,8 +794,8 @@@ int stringmatch(const char *pattern, co long long memtoll(const char *p, int *err); int ll2string(char *s, size_t len, long long value); int isStringRepresentableAsLong(sds s, long *longval); + int isStringRepresentableAsLongLong(sds s, long long *longval); + int isObjectRepresentableAsLongLong(robj *o, long long *llongval); /* Configuration */ void loadServerConfig(char *filename); @@@ -755,10 -801,10 +804,10 @@@ void resetServerSaveParams() /* db.c -- Keyspace access API */ int removeExpire(redisDb *db, robj *key); + void propagateExpire(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key); - int deleteIfVolatile(redisDb *db, robj *key); time_t getExpire(redisDb *db, robj *key); - int setExpire(redisDb *db, robj *key, time_t when); + void setExpire(redisDb *db, robj *key, time_t when); robj *lookupKey(redisDb *db, robj *key); robj *lookupKeyRead(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key); @@@ -841,6 -887,7 +890,7 @@@ void expireCommand(redisClient *c) void expireatCommand(redisClient *c); void getsetCommand(redisClient *c); void ttlCommand(redisClient *c); + void persistCommand(redisClient *c); void slaveofCommand(redisClient *c); void debugCommand(redisClient *c); void msetCommand(redisClient *c);