c->querybuf = sdsempty();
c->argc = 0;
c->argv = NULL;
+ c->bufpos = 0;
c->flags = 0;
/* We set the fake client as a slave waiting for the synchronization
* so that Redis will not try to send replies to this client. */
fakeClient->argc = argc;
fakeClient->argv = argv;
cmd->proc(fakeClient);
- /* Discard the reply objects list from the fake client */
- while(listLength(fakeClient->reply))
- listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
+
+ /* The fake client should not have a reply */
+ redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
+
/* Clean up, ready for the next command */
for (j = 0; j < argc; j++) decrRefCount(argv[j]);
zfree(argv);
+
/* Handle swapping while loading big datasets when VM is on */
force_swapout = 0;
if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
}
exit(1);
fmterr:
- redisLog(REDIS_WARNING,"Bad file format reading the append only file");
+ redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
exit(1);
}
redisPanic("Unknown list encoding");
}
} else if (o->type == REDIS_SET) {
- /* Emit the SADDs needed to rebuild the set */
- dict *set = o->ptr;
- dictIterator *di = dictGetIterator(set);
- dictEntry *de;
+ char cmd[]="*3\r\n$4\r\nSADD\r\n";
- while((de = dictNext(di)) != NULL) {
- char cmd[]="*3\r\n$4\r\nSADD\r\n";
- robj *eleobj = dictGetEntryKey(de);
-
- if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
- if (fwriteBulkObject(fp,&key) == 0) goto werr;
- if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+ /* Emit the SADDs needed to rebuild the set */
+ if (o->encoding == REDIS_ENCODING_INTSET) {
+ int ii = 0;
+ int64_t llval;
+ while(intsetGet(o->ptr,ii++,&llval)) {
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkLongLong(fp,llval) == 0) goto werr;
+ }
+ } else if (o->encoding == REDIS_ENCODING_HT) {
+ dictIterator *di = dictGetIterator(o->ptr);
+ dictEntry *de;
+ while((de = dictNext(di)) != NULL) {
+ robj *eleobj = dictGetEntryKey(de);
+ if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+ if (fwriteBulkObject(fp,&key) == 0) goto werr;
+ if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+ }
+ dictReleaseIterator(di);
+ } else {
+ redisPanic("Unknown set encoding");
}
- dictReleaseIterator(di);
} else if (o->type == REDIS_ZSET) {
/* Emit the ZADDs needed to rebuild the sorted set */
zset *zs = o->ptr;
char tmpfile[256];
if (server.vm_enabled) vmReopenSwapFile();
- close(server.fd);
+ if (server.ipfd > 0) close(server.ipfd);
+ if (server.sofd > 0) close(server.sofd);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
_exit(0);
void bgrewriteaofCommand(redisClient *c) {
if (server.bgrewritechildpid != -1) {
- addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
+ addReplyError(c,"Background append only file rewriting already in progress");
return;
}
if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
- char *status = "+Background append only file rewriting started\r\n";
- addReplySds(c,sdsnew(status));
+ addReplyStatus(c,"Background append only file rewriting started");
} else {
addReply(c,shared.err);
}
}
} else if (!strcasecmp(argv[0],"bind") && argc == 2) {
server.bindaddr = zstrdup(argv[1]);
+ } else if (!strcasecmp(argv[0],"unixsocket") && argc == 2) {
+ server.unixsocket = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"save") && argc == 3) {
int seconds = atoi(argv[1]);
int changes = atoi(argv[2]);
server.list_max_ziplist_entries = memtoll(argv[1], NULL);
} else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2){
server.list_max_ziplist_value = memtoll(argv[1], NULL);
+ } else if (!strcasecmp(argv[0],"set-max-intset-entries") && argc == 2){
+ server.set_max_intset_entries = memtoll(argv[1], NULL);
} else {
err = "Bad directive or wrong number of arguments"; goto loaderr;
}
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
ll < 0) goto badfmt;
server.maxmemory = ll;
+ if (server.maxmemory) freeMemoryIfNeeded();
} else if (!strcasecmp(c->argv[2]->ptr,"timeout")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
ll < 0 || ll > LONG_MAX) goto badfmt;
stopAppendOnly();
} else {
if (startAppendOnly() == REDIS_ERR) {
- addReplySds(c,sdscatprintf(sdsempty(),
- "-ERR Unable to turn on AOF. Check server logs.\r\n"));
+ addReplyError(c,
+ "Unable to turn on AOF. Check server logs.");
decrRefCount(o);
return;
}
}
sdsfreesplitres(v,vlen);
} else {
- addReplySds(c,sdscatprintf(sdsempty(),
- "-ERR not supported CONFIG parameter %s\r\n",
- (char*)c->argv[2]->ptr));
+ addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
+ (char*)c->argv[2]->ptr);
decrRefCount(o);
return;
}
return;
badfmt: /* Bad format errors */
- addReplySds(c,sdscatprintf(sdsempty(),
- "-ERR invalid argument '%s' for CONFIG SET '%s'\r\n",
+ addReplyErrorFormat(c,"Invalid argument '%s' for CONFIG SET '%s'",
(char*)o->ptr,
- (char*)c->argv[2]->ptr));
+ (char*)c->argv[2]->ptr);
decrRefCount(o);
}
void configGetCommand(redisClient *c) {
robj *o = getDecodedObject(c->argv[2]);
- robj *lenobj = createObject(REDIS_STRING,NULL);
+ void *replylen = addDeferredMultiBulkLength(c);
char *pattern = o->ptr;
int matches = 0;
- addReply(c,lenobj);
- decrRefCount(lenobj);
-
if (stringmatch(pattern,"dbfilename",0)) {
addReplyBulkCString(c,"dbfilename");
addReplyBulkCString(c,server.dbfilename);
matches++;
}
decrRefCount(o);
- lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2);
+ setDeferredMultiBulkLength(c,replylen,matches*2);
}
void configCommand(redisClient *c) {
server.stat_starttime = time(NULL);
addReply(c,shared.ok);
} else {
- addReplySds(c,sdscatprintf(sdsempty(),
- "-ERR CONFIG subcommand must be one of GET, SET, RESETSTAT\r\n"));
+ addReplyError(c,
+ "CONFIG subcommand must be one of GET, SET, RESETSTAT");
}
return;
badarity:
- addReplySds(c,sdscatprintf(sdsempty(),
- "-ERR Wrong number of arguments for CONFIG %s\r\n",
- (char*) c->argv[1]->ptr));
+ addReplyErrorFormat(c,"Wrong number of arguments for CONFIG %s",
+ (char*) c->argv[1]->ptr);
}
#include "redis.h"
-
#include <sys/uio.h>
void *dupClientReplyValue(void *o) {
}
redisClient *createClient(int fd) {
- redisClient *c = zmalloc(sizeof(*c));
+ redisClient *c = zmalloc(sizeof(redisClient));
+ c->bufpos = 0;
anetNonBlock(NULL,fd);
anetTcpNoDelay(NULL,fd);
if (!c) return NULL;
+ if (aeCreateFileEvent(server.el,fd,AE_READABLE,
+ readQueryFromClient, c) == AE_ERR)
+ {
+ close(fd);
+ zfree(c);
+ return NULL;
+ }
+
selectDb(c,0);
c->fd = fd;
c->querybuf = sdsempty();
+ c->newline = NULL;
c->argc = 0;
c->argv = NULL;
c->bulklen = -1;
c->pubsub_patterns = listCreate();
listSetFreeMethod(c->pubsub_patterns,decrRefCount);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
- if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
- readQueryFromClient, c) == AE_ERR) {
- freeClient(c);
- return NULL;
- }
listAddNodeTail(server.clients,c);
initClientMultiState(c);
return c;
}
- void addReply(redisClient *c, robj *obj) {
- if (listLength(c->reply) == 0 &&
+ int _installWriteEvent(redisClient *c) {
+ if (c->fd <= 0) return REDIS_ERR;
+ if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
- sendReplyToClient, c) == AE_ERR) return;
+ sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
+ return REDIS_OK;
+ }
- if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
- obj = dupStringObject(obj);
- obj->refcount = 0; /* getDecodedObject() will increment the refcount */
+ /* Create a duplicate of the last object in the reply list when
+ * it is not exclusively owned by the reply list. */
+ robj *dupLastObjectIfNeeded(list *reply) {
+ robj *new, *cur;
+ listNode *ln;
+ redisAssert(listLength(reply) > 0);
+ ln = listLast(reply);
+ cur = listNodeValue(ln);
+ if (cur->refcount > 1) {
+ new = dupStringObject(cur);
+ decrRefCount(cur);
+ listNodeValue(ln) = new;
}
- listAddNodeTail(c->reply,getDecodedObject(obj));
+ return listNodeValue(ln);
}
- void addReplySds(redisClient *c, sds s) {
- robj *o = createObject(REDIS_STRING,s);
- addReply(c,o);
- decrRefCount(o);
+ int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
+ size_t available = sizeof(c->buf)-c->bufpos;
+
+ /* If there already are entries in the reply list, we cannot
+ * add anything more to the static buffer. */
+ if (listLength(c->reply) > 0) return REDIS_ERR;
+
+ /* Check that the buffer has enough space available for this string. */
+ if (len > available) return REDIS_ERR;
+
+ memcpy(c->buf+c->bufpos,s,len);
+ c->bufpos+=len;
+ return REDIS_OK;
}
- void addReplyDouble(redisClient *c, double d) {
- char buf[128];
+ void _addReplyObjectToList(redisClient *c, robj *o) {
+ robj *tail;
+ if (listLength(c->reply) == 0) {
+ incrRefCount(o);
+ listAddNodeTail(c->reply,o);
+ } else {
+ tail = listNodeValue(listLast(c->reply));
- snprintf(buf,sizeof(buf),"%.17g",d);
- addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
- (unsigned long) strlen(buf),buf));
+ /* Append to this object when possible. */
+ if (tail->ptr != NULL &&
+ sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
+ {
+ tail = dupLastObjectIfNeeded(c->reply);
+ tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
+ } else {
+ incrRefCount(o);
+ listAddNodeTail(c->reply,o);
+ }
+ }
}
- void addReplyLongLong(redisClient *c, long long ll) {
- char buf[128];
- size_t len;
+ /* This method takes responsibility over the sds. When it is no longer
+ * needed it will be free'd, otherwise it ends up in a robj. */
+ void _addReplySdsToList(redisClient *c, sds s) {
+ robj *tail;
+ if (listLength(c->reply) == 0) {
+ listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
+ } else {
+ tail = listNodeValue(listLast(c->reply));
- if (ll == 0) {
- addReply(c,shared.czero);
- return;
- } else if (ll == 1) {
- addReply(c,shared.cone);
+ /* Append to this object when possible. */
+ if (tail->ptr != NULL &&
+ sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
+ {
+ tail = dupLastObjectIfNeeded(c->reply);
+ tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
+ sdsfree(s);
+ } else {
+ listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
+ }
+ }
+ }
+
+ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
+ robj *tail;
+ if (listLength(c->reply) == 0) {
+ listAddNodeTail(c->reply,createStringObject(s,len));
+ } else {
+ tail = listNodeValue(listLast(c->reply));
+
+ /* Append to this object when possible. */
+ if (tail->ptr != NULL &&
+ sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
+ {
+ tail = dupLastObjectIfNeeded(c->reply);
+ tail->ptr = sdscatlen(tail->ptr,s,len);
+ } else {
+ listAddNodeTail(c->reply,createStringObject(s,len));
+ }
+ }
+ }
+
+ void addReply(redisClient *c, robj *obj) {
+ if (_installWriteEvent(c) != REDIS_OK) return;
+ redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY);
+
+ /* This is an important place where we can avoid copy-on-write
+ * when there is a saving child running, avoiding touching the
+ * refcount field of the object if it's not needed.
+ *
+ * If the encoding is RAW and there is room in the static buffer
+ * we'll be able to send the object to the client without
+ * messing with its page. */
+ if (obj->encoding == REDIS_ENCODING_RAW) {
+ if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
+ _addReplyObjectToList(c,obj);
+ } else {
+ obj = getDecodedObject(obj);
+ if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
+ _addReplyObjectToList(c,obj);
+ decrRefCount(obj);
+ }
+ }
+
+ void addReplySds(redisClient *c, sds s) {
+ if (_installWriteEvent(c) != REDIS_OK) {
+ /* The caller expects the sds to be free'd. */
+ sdsfree(s);
return;
}
- buf[0] = ':';
+ if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
+ sdsfree(s);
+ } else {
+ /* This method free's the sds when it is no longer needed. */
+ _addReplySdsToList(c,s);
+ }
+ }
+
+ void addReplyString(redisClient *c, char *s, size_t len) {
+ if (_installWriteEvent(c) != REDIS_OK) return;
+ if (_addReplyToBuffer(c,s,len) != REDIS_OK)
+ _addReplyStringToList(c,s,len);
+ }
+
+ void _addReplyError(redisClient *c, char *s, size_t len) {
+ addReplyString(c,"-ERR ",5);
+ addReplyString(c,s,len);
+ addReplyString(c,"\r\n",2);
+ }
+
+ void addReplyError(redisClient *c, char *err) {
+ _addReplyError(c,err,strlen(err));
+ }
+
+ void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap,fmt);
+ sds s = sdscatvprintf(sdsempty(),fmt,ap);
+ va_end(ap);
+ _addReplyError(c,s,sdslen(s));
+ sdsfree(s);
+ }
+
+ void _addReplyStatus(redisClient *c, char *s, size_t len) {
+ addReplyString(c,"+",1);
+ addReplyString(c,s,len);
+ addReplyString(c,"\r\n",2);
+ }
+
+ void addReplyStatus(redisClient *c, char *status) {
+ _addReplyStatus(c,status,strlen(status));
+ }
+
+ void addReplyStatusFormat(redisClient *c, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap,fmt);
+ sds s = sdscatvprintf(sdsempty(),fmt,ap);
+ va_end(ap);
+ _addReplyStatus(c,s,sdslen(s));
+ sdsfree(s);
+ }
+
+ /* Adds an empty object to the reply list that will contain the multi bulk
+ * length, which is not known when this function is called. */
+ void *addDeferredMultiBulkLength(redisClient *c) {
+ /* Note that we install the write event here even if the object is not
+ * ready to be sent, since we are sure that before returning to the
+ * event loop setDeferredMultiBulkLength() will be called. */
+ if (_installWriteEvent(c) != REDIS_OK) return NULL;
+ listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL));
+ return listLast(c->reply);
+ }
+
+ /* Populate the length object and try glueing it to the next chunk. */
+ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
+ listNode *ln = (listNode*)node;
+ robj *len, *next;
+
+ /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
+ if (node == NULL) return;
+
+ len = listNodeValue(ln);
+ len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
+ if (ln->next != NULL) {
+ next = listNodeValue(ln->next);
+
+ /* Only glue when the next node is non-NULL (an sds in this case) */
+ if (next->ptr != NULL) {
+ len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
+ listDelNode(c->reply,ln->next);
+ }
+ }
+ }
+
+ void addReplyDouble(redisClient *c, double d) {
+ char dbuf[128], sbuf[128];
+ int dlen, slen;
+ dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
+ slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
+ addReplyString(c,sbuf,slen);
+ }
+
+ void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
+ char buf[128];
+ int len;
+ buf[0] = prefix;
len = ll2string(buf+1,sizeof(buf)-1,ll);
buf[len+1] = '\r';
buf[len+2] = '\n';
- addReplySds(c,sdsnewlen(buf,len+3));
+ addReplyString(c,buf,len+3);
}
- void addReplyUlong(redisClient *c, unsigned long ul) {
- char buf[128];
- size_t len;
+ void addReplyLongLong(redisClient *c, long long ll) {
+ _addReplyLongLong(c,ll,':');
+ }
- if (ul == 0) {
- addReply(c,shared.czero);
- return;
- } else if (ul == 1) {
- addReply(c,shared.cone);
- return;
- }
- len = snprintf(buf,sizeof(buf),":%lu\r\n",ul);
- addReplySds(c,sdsnewlen(buf,len));
+ void addReplyMultiBulkLen(redisClient *c, long length) {
+ _addReplyLongLong(c,length,'*');
}
void addReplyBulkLen(redisClient *c, robj *obj) {
- size_t len, intlen;
- char buf[128];
+ size_t len;
if (obj->encoding == REDIS_ENCODING_RAW) {
len = sdslen(obj->ptr);
len++;
}
}
- buf[0] = '$';
- intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len);
- buf[intlen+1] = '\r';
- buf[intlen+2] = '\n';
- addReplySds(c,sdsnewlen(buf,intlen+3));
+ _addReplyLongLong(c,len,'$');
}
void addReplyBulk(redisClient *c, robj *obj) {
}
}
-void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
- int cport, cfd;
- char cip[128];
+static void acceptCommonHandler(int fd) {
redisClient *c;
- REDIS_NOTUSED(el);
- REDIS_NOTUSED(mask);
- REDIS_NOTUSED(privdata);
-
- cfd = anetAccept(server.neterr, fd, cip, &cport);
- if (cfd == AE_ERR) {
- redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
- return;
- }
- redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
- if ((c = createClient(cfd)) == NULL) {
+ if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,"Error allocating resoures for the client");
- close(cfd); /* May be already closed, just ingore errors */
+ close(fd); /* May be already closed, just ingore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
server.stat_numconnections++;
}
+void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
+ int cport, cfd;
+ char cip[128];
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(mask);
+ REDIS_NOTUSED(privdata);
+
+ cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
+ if (cfd == AE_ERR) {
+ redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
+ return;
+ }
+ redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
+ acceptCommonHandler(cfd);
+}
+
+void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
+ int cfd;
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(mask);
+ REDIS_NOTUSED(privdata);
+
+ cfd = anetUnixAccept(server.neterr, fd);
+ if (cfd == AE_ERR) {
+ redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
+ return;
+ }
+ redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
+ acceptCommonHandler(cfd);
+}
+
+
static void freeClientArgv(redisClient *c) {
int j;
server.vm_blocked_clients--;
}
listRelease(c->io_keys);
- /* Master/slave cleanup */
+ /* Master/slave cleanup.
+ * Case 1: we lost the connection with a slave. */
if (c->flags & REDIS_SLAVE) {
if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
close(c->repldbfd);
redisAssert(ln != NULL);
listDelNode(l,ln);
}
+
+ /* Case 2: we lost the connection with the master. */
if (c->flags & REDIS_MASTER) {
server.master = NULL;
server.replstate = REDIS_REPL_CONNECT;
+ /* Since we lost the connection with the master, we should also
+ * close the connection with all our slaves if we have any, so
+ * when we'll resync with the master the other slaves will sync again
+ * with us as well. Note that also when the slave is not connected
+ * to the master it will keep refusing connections by other slaves. */
+ while (listLength(server.slaves)) {
+ ln = listFirst(server.slaves);
+ freeClient((redisClient*)ln->value);
+ }
}
/* Release memory */
zfree(c->argv);
zfree(c);
}
- #define GLUEREPLY_UP_TO (1024)
- static void glueReplyBuffersIfNeeded(redisClient *c) {
- int copylen = 0;
- char buf[GLUEREPLY_UP_TO];
- listNode *ln;
- listIter li;
- robj *o;
-
- listRewind(c->reply,&li);
- while((ln = listNext(&li))) {
- int objlen;
-
- o = ln->value;
- objlen = sdslen(o->ptr);
- if (copylen + objlen <= GLUEREPLY_UP_TO) {
- memcpy(buf+copylen,o->ptr,objlen);
- copylen += objlen;
- listDelNode(c->reply,ln);
- } else {
- if (copylen == 0) return;
- break;
- }
- }
- /* Now the output buffer is empty, add the new single element */
- o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
- listAddNodeHead(c->reply,o);
- }
-
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen;
return;
}
- while(listLength(c->reply)) {
- if (server.glueoutputbuf && listLength(c->reply) > 1)
- glueReplyBuffersIfNeeded(c);
+ while(c->bufpos > 0 || listLength(c->reply)) {
+ if (c->bufpos > 0) {
+ if (c->flags & REDIS_MASTER) {
+ /* Don't reply to a master */
+ nwritten = c->bufpos - c->sentlen;
+ } else {
+ nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
+ if (nwritten <= 0) break;
+ }
+ c->sentlen += nwritten;
+ totwritten += nwritten;
+
+ /* If the buffer was sent, set bufpos to zero to continue with
+ * the remainder of the reply. */
+ if (c->sentlen == c->bufpos) {
+ c->bufpos = 0;
+ c->sentlen = 0;
+ }
+ } else {
+ o = listNodeValue(listFirst(c->reply));
+ objlen = sdslen(o->ptr);
- o = listNodeValue(listFirst(c->reply));
- objlen = sdslen(o->ptr);
+ if (objlen == 0) {
+ listDelNode(c->reply,listFirst(c->reply));
+ continue;
+ }
- if (objlen == 0) {
- listDelNode(c->reply,listFirst(c->reply));
- continue;
- }
+ if (c->flags & REDIS_MASTER) {
+ /* Don't reply to a master */
+ nwritten = objlen - c->sentlen;
+ } else {
+ nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
+ if (nwritten <= 0) break;
+ }
+ c->sentlen += nwritten;
+ totwritten += nwritten;
- if (c->flags & REDIS_MASTER) {
- /* Don't reply to a master */
- nwritten = objlen - c->sentlen;
- } else {
- nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
- if (nwritten <= 0) break;
- }
- c->sentlen += nwritten;
- totwritten += nwritten;
- /* If we fully sent the object on head go to the next one */
- if (c->sentlen == objlen) {
- listDelNode(c->reply,listFirst(c->reply));
- c->sentlen = 0;
+ /* If we fully sent the object on head go to the next one */
+ if (c->sentlen == objlen) {
+ listDelNode(c->reply,listFirst(c->reply));
+ c->sentlen = 0;
+ }
}
/* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
freeClientArgv(c);
c->bulklen = -1;
c->multibulk = 0;
+ c->newline = NULL;
}
void closeTimedoutClients(void) {
if (server.maxidletime &&
!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
!(c->flags & REDIS_MASTER) && /* no timeout for masters */
+ !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
listLength(c->pubsub_patterns) == 0 &&
(now - c->lastinteraction > server.maxidletime))
}
void processInputBuffer(redisClient *c) {
+ int seeknewline = 0;
+
again:
/* Before to process the input buffer, make sure the client is not
* waitig for a blocking operation such as BLPOP. Note that the first
* in the input buffer the client may be blocked, and the "goto again"
* will try to reiterate. The following line will make it return asap. */
if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
+
+ if (seeknewline && c->bulklen == -1) c->newline = strchr(c->querybuf,'\n');
+ seeknewline = 1;
if (c->bulklen == -1) {
/* Read the first line of the query */
- char *p = strchr(c->querybuf,'\n');
size_t querylen;
- if (p) {
+ if (c->newline) {
+ char *p = c->newline;
sds query, *argv;
int argc, j;
+ c->newline = NULL;
query = c->querybuf;
c->querybuf = sdsempty();
querylen = 1+(p-(query));
return;
}
if (nread) {
+ size_t oldlen = sdslen(c->querybuf);
c->querybuf = sdscatlen(c->querybuf, buf, nread);
c->lastinteraction = time(NULL);
+ /* Scan this new piece of the query for the newline. We do this
+ * here in order to make sure we perform this scan just one time
+ * per piece of buffer, leading to an O(N) scan instead of O(N*N) */
+ if (c->bulklen == -1 && c->newline == NULL)
+ c->newline = strchr(c->querybuf+oldlen,'\n');
} else {
return;
}
}
} else if (o->type == REDIS_SET) {
/* Save a set value */
- dict *set = o->ptr;
- dictIterator *di = dictGetIterator(set);
- dictEntry *de;
-
- if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
- while((de = dictNext(di)) != NULL) {
- robj *eleobj = dictGetEntryKey(de);
+ if (o->encoding == REDIS_ENCODING_HT) {
+ dict *set = o->ptr;
+ dictIterator *di = dictGetIterator(set);
+ dictEntry *de;
- if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
+ if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
+ while((de = dictNext(di)) != NULL) {
+ robj *eleobj = dictGetEntryKey(de);
+ if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
+ }
+ dictReleaseIterator(di);
+ } else if (o->encoding == REDIS_ENCODING_INTSET) {
+ intset *is = o->ptr;
+ int64_t llval;
+ int i = 0;
+
+ if (rdbSaveLen(fp,intsetLen(is)) == -1) return -1;
+ while(intsetGet(is,i++,&llval)) {
+ if (rdbSaveLongLongAsStringObject(fp,llval) == -1) return -1;
+ }
+ } else {
+ redisPanic("Unknown set encoding");
}
- dictReleaseIterator(di);
} else if (o->type == REDIS_ZSET) {
/* Save a set value */
zset *zs = o->ptr;
if (server.bgsavechildpid != -1) return REDIS_ERR;
if (server.vm_enabled) waitEmptyIOJobsQueue();
+ server.dirty_before_bgsave = server.dirty;
if ((childpid = fork()) == 0) {
/* Child */
if (server.vm_enabled) vmReopenSwapFile();
- close(server.fd);
+ if (server.ipfd > 0) close(server.ipfd);
+ if (server.sofd > 0) close(server.sofd);
if (rdbSave(filename) == REDIS_OK) {
_exit(0);
} else {
robj *rdbLoadObject(int type, FILE *fp) {
robj *o, *ele, *dec;
size_t len;
+ unsigned int i;
redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp));
if (type == REDIS_STRING) {
} else if (type == REDIS_SET) {
/* Read list/set value */
if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
- o = createSetObject();
- /* It's faster to expand the dict to the right size asap in order
- * to avoid rehashing */
- if (len > DICT_HT_INITIAL_SIZE)
- dictExpand(o->ptr,len);
+
+ /* Use a regular set when there are too many entries. */
+ if (len > server.set_max_intset_entries) {
+ o = createSetObject();
+ /* It's faster to expand the dict to the right size asap in order
+ * to avoid rehashing */
+ if (len > DICT_HT_INITIAL_SIZE)
+ dictExpand(o->ptr,len);
+ } else {
+ o = createIntsetObject();
+ }
+
/* Load every single element of the list/set */
- while(len--) {
+ for (i = 0; i < len; i++) {
+ long long llval;
if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
ele = tryObjectEncoding(ele);
- dictAdd((dict*)o->ptr,ele,NULL);
+
+ if (o->encoding == REDIS_ENCODING_INTSET) {
+ /* Fetch integer value from element */
+ if (isObjectRepresentableAsLongLong(ele,&llval) == REDIS_OK) {
+ o->ptr = intsetAdd(o->ptr,llval,NULL);
+ } else {
+ setTypeConvert(o,REDIS_ENCODING_HT);
+ dictExpand(o->ptr,len);
+ }
+ }
+
+ /* This will also be called when the set was just converted
+ * to regular hashtable encoded set */
+ if (o->encoding == REDIS_ENCODING_HT) {
+ dictAdd((dict*)o->ptr,ele,NULL);
+ } else {
+ decrRefCount(ele);
+ }
}
} else if (type == REDIS_ZSET) {
/* Read list/set value */
/* Load every single element of the list/set */
while(zsetlen--) {
robj *ele;
- double *score = zmalloc(sizeof(double));
+ double score;
+ zskiplistNode *znode;
if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
ele = tryObjectEncoding(ele);
- if (rdbLoadDoubleValue(fp,score) == -1) return NULL;
- dictAdd(zs->dict,ele,score);
- zslInsert(zs->zsl,*score,ele);
+ if (rdbLoadDoubleValue(fp,&score) == -1) return NULL;
+ znode = zslInsert(zs->zsl,score,ele);
+ dictAdd(zs->dict,ele,&znode->score);
incrRefCount(ele); /* added to skiplist */
}
} else if (type == REDIS_HASH) {
if (!bysignal && exitcode == 0) {
redisLog(REDIS_NOTICE,
"Background saving terminated with success");
- server.dirty = 0;
+ server.dirty = server.dirty - server.dirty_before_bgsave;
server.lastsave = time(NULL);
} else if (!bysignal && exitcode != 0) {
redisLog(REDIS_WARNING, "Background saving error");
aeEventLoop *el;
char *hostip;
int hostport;
+ char *hostsocket;
int keepalive;
long long start;
long long totlatency;
int *latency;
+ char *title;
list *clients;
int quiet;
int loop;
}
}
+ /* Read a length from the buffer pointed to by *p, store the length in *len,
+ * and return the number of bytes that the cursor advanced. */
+ static int readLen(char *p, int *len) {
+ char *tail = strstr(p,"\r\n");
+ if (tail == NULL)
+ return 0;
+ *tail = '\0';
+ *len = atoi(p+1);
+ return tail+2-p;
+ }
+
static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask)
{
- char buf[1024];
- int nread;
+ char buf[1024], *p;
+ int nread, pos=0, len=0;
client c = privdata;
REDIS_NOTUSED(el);
REDIS_NOTUSED(fd);
REDIS_NOTUSED(mask);
- nread = read(c->fd, buf, 1024);
+ nread = read(c->fd,buf,sizeof(buf));
if (nread == -1) {
fprintf(stderr, "Reading from socket: %s\n", strerror(errno));
freeClient(c);
}
c->totreceived += nread;
c->ibuf = sdscatlen(c->ibuf,buf,nread);
+ len = sdslen(c->ibuf);
- processdata:
- /* Are we waiting for the first line of the command of for sdf
- * count in bulk or multi bulk operations? */
if (c->replytype == REPLY_INT ||
- c->replytype == REPLY_RETCODE ||
- (c->replytype == REPLY_BULK && c->readlen == -1) ||
- (c->replytype == REPLY_MBULK && c->readlen == -1) ||
- (c->replytype == REPLY_MBULK && c->mbulk == -1)) {
- char *p;
-
- /* Check if the first line is complete. This is only true if
- * there is a newline inside the buffer. */
- if ((p = strchr(c->ibuf,'\n')) != NULL) {
- if (c->replytype == REPLY_BULK ||
- (c->replytype == REPLY_MBULK && c->mbulk != -1))
- {
- /* Read the count of a bulk reply (being it a single bulk or
- * a multi bulk reply). "$<count>" for the protocol spec. */
- *p = '\0';
- *(p-1) = '\0';
- c->readlen = atoi(c->ibuf+1)+2;
- // printf("BULK ATOI: %s\n", c->ibuf+1);
- /* Handle null bulk reply "$-1" */
- if (c->readlen-2 == -1) {
- clientDone(c);
- return;
- }
- /* Leave all the rest in the input buffer */
- c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1);
- /* fall through to reach the point where the code will try
- * to check if the bulk reply is complete. */
- } else if (c->replytype == REPLY_MBULK && c->mbulk == -1) {
- /* Read the count of a multi bulk reply. That is, how many
- * bulk replies we have to read next. "*<count>" protocol. */
- *p = '\0';
- *(p-1) = '\0';
- c->mbulk = atoi(c->ibuf+1);
- /* Handle null bulk reply "*-1" */
- if (c->mbulk == -1) {
- clientDone(c);
- return;
+ c->replytype == REPLY_RETCODE)
+ {
+ /* Check if the first line is complete. This is everything we need
+ * when waiting for an integer or status code reply.*/
+ if ((p = strstr(c->ibuf,"\r\n")) != NULL)
+ goto done;
+ } else if (c->replytype == REPLY_BULK) {
+ int advance = 0;
+ if (c->readlen < 0) {
+ advance = readLen(c->ibuf+pos,&c->readlen);
+ if (advance) {
+ pos += advance;
+ if (c->readlen == -1) {
+ goto done;
+ } else {
+ /* include the trailing \r\n */
+ c->readlen += 2;
}
- // printf("%p) %d elements list\n", c, c->mbulk);
- /* Leave all the rest in the input buffer */
- c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1);
- goto processdata;
} else {
- c->ibuf = sdstrim(c->ibuf,"\r\n");
- clientDone(c);
- return;
+ goto skip;
}
}
- }
- /* bulk read, did we read everything? */
- if (((c->replytype == REPLY_MBULK && c->mbulk != -1) ||
- (c->replytype == REPLY_BULK)) && c->readlen != -1 &&
- (unsigned)c->readlen <= sdslen(c->ibuf))
- {
- // printf("BULKSTATUS mbulk:%d readlen:%d sdslen:%d\n",
- // c->mbulk,c->readlen,sdslen(c->ibuf));
- if (c->replytype == REPLY_BULK) {
- clientDone(c);
- } else if (c->replytype == REPLY_MBULK) {
- // printf("%p) %d (%d)) ",c, c->mbulk, c->readlen);
- // fwrite(c->ibuf,c->readlen,1,stdout);
- // printf("\n");
- if (--c->mbulk == 0) {
- clientDone(c);
+
+ int canconsume;
+ if (c->readlen > 0) {
+ canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen;
+ c->readlen -= canconsume;
+ pos += canconsume;
+ }
+
+ if (c->readlen == 0)
+ goto done;
+ } else if (c->replytype == REPLY_MBULK) {
+ int advance = 0;
+ if (c->mbulk == -1) {
+ advance = readLen(c->ibuf+pos,&c->mbulk);
+ if (advance) {
+ pos += advance;
+ if (c->mbulk == -1)
+ goto done;
+ } else {
+ goto skip;
+ }
+ }
+
+ int canconsume;
+ while(c->mbulk > 0 && pos < len) {
+ if (c->readlen > 0) {
+ canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen;
+ c->readlen -= canconsume;
+ pos += canconsume;
+ if (c->readlen == 0)
+ c->mbulk--;
} else {
- c->ibuf = sdsrange(c->ibuf,c->readlen,-1);
- c->readlen = -1;
- goto processdata;
+ advance = readLen(c->ibuf+pos,&c->readlen);
+ if (advance) {
+ pos += advance;
+ if (c->readlen == -1) {
+ c->mbulk--;
+ continue;
+ } else {
+ /* include the trailing \r\n */
+ c->readlen += 2;
+ }
+ } else {
+ goto skip;
+ }
}
}
+
+ if (c->mbulk == 0)
+ goto done;
}
+
+ skip:
+ c->ibuf = sdsrange(c->ibuf,pos,-1);
+ return;
+ done:
+ clientDone(c);
+ return;
}
static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask)
client c = zmalloc(sizeof(struct _client));
char err[ANET_ERR_LEN];
- c->fd = anetTcpNonBlockConnect(err,config.hostip,config.hostport);
+ if (config.hostsocket == NULL)
+ c->fd = anetTcpNonBlockConnect(err,config.hostip,config.hostport);
+ else
+ c->fd = anetUnixNonBlockConnect(err,config.hostsocket);
+
if (c->fd == ANET_ERR) {
zfree(c);
fprintf(stderr,"Connect: %s\n",err);
}
}
- static void showLatencyReport(char *title) {
+ static void showLatencyReport(void) {
int j, seen = 0;
float perc, reqpersec;
reqpersec = (float)config.donerequests/((float)config.totlatency/1000);
if (!config.quiet) {
- printf("====== %s ======\n", title);
+ printf("====== %s ======\n", config.title);
printf(" %d requests completed in %.2f seconds\n", config.donerequests,
(float)config.totlatency/1000);
printf(" %d parallel clients\n", config.numclients);
}
printf("%.2f requests per second\n\n", reqpersec);
} else {
- printf("%s: %.2f requests per second\n", title, reqpersec);
+ printf("%s: %.2f requests per second\n", config.title, reqpersec);
}
}
- static void prepareForBenchmark(void)
- {
+ static void prepareForBenchmark(char *title) {
memset(config.latency,0,sizeof(int)*(MAX_LATENCY+1));
+ config.title = title;
config.start = mstime();
config.donerequests = 0;
}
- static void endBenchmark(char *title) {
+ static void endBenchmark(void) {
config.totlatency = mstime()-config.start;
- showLatencyReport(title);
+ showLatencyReport();
freeAllClients();
}
} else if (!strcmp(argv[i],"-p") && !lastarg) {
config.hostport = atoi(argv[i+1]);
i++;
+ } else if (!strcmp(argv[i],"-s") && !lastarg) {
+ config.hostsocket = argv[i+1];
+ i++;
} else if (!strcmp(argv[i],"-d") && !lastarg) {
config.datasize = atoi(argv[i+1]);
i++;
printf("Wrong option '%s' or option argument missing\n\n",argv[i]);
printf("Usage: redis-benchmark [-h <host>] [-p <port>] [-c <clients>] [-n <requests]> [-k <boolean>]\n\n");
printf(" -h <hostname> Server hostname (default 127.0.0.1)\n");
- printf(" -p <hostname> Server port (default 6379)\n");
+ printf(" -p <port> Server port (default 6379)\n");
+ printf(" -s <socket> Server socket (overrides host and port)\n");
printf(" -c <clients> Number of parallel connections (default 50)\n");
printf(" -n <requests> Total number of requests (default 10000)\n");
printf(" -d <size> Data size of SET/GET value in bytes (default 2)\n");
}
}
+ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) {
+ REDIS_NOTUSED(eventLoop);
+ REDIS_NOTUSED(id);
+ REDIS_NOTUSED(clientData);
+
+ float dt = (float)(mstime()-config.start)/1000.0;
+ float rps = (float)config.donerequests/dt;
+ printf("%s: %.2f\r", config.title, rps);
+ fflush(stdout);
+ return 250; /* every 250ms */
+ }
+
int main(int argc, char **argv) {
client c;
config.requests = 10000;
config.liveclients = 0;
config.el = aeCreateEventLoop();
+ aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL);
config.keepalive = 1;
config.donerequests = 0;
config.datasize = 3;
config.hostip = "127.0.0.1";
config.hostport = 6379;
+ config.hostsocket = NULL;
parseOptions(argc,argv);
if (config.idlemode) {
printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients);
- prepareForBenchmark();
+ prepareForBenchmark("IDLE");
c = createClient();
if (!c) exit(1);
c->obuf = sdsempty();
}
do {
- prepareForBenchmark();
+ prepareForBenchmark("PING");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"PING\r\n");
prepareClientForReply(c,REPLY_RETCODE);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("PING");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("PING (multi bulk)");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"*1\r\n$4\r\nPING\r\n");
prepareClientForReply(c,REPLY_RETCODE);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("PING (multi bulk)");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("SET");
c = createClient();
if (!c) exit(1);
c->obuf = sdscatprintf(c->obuf,"SET foo_rand000000000000 %d\r\n",config.datasize);
prepareClientForReply(c,REPLY_RETCODE);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("SET");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("GET");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"GET foo_rand000000000000\r\n");
prepareClientForReply(c,REPLY_BULK);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("GET");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("INCR");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"INCR counter_rand000000000000\r\n");
prepareClientForReply(c,REPLY_INT);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("INCR");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("LPUSH");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n");
prepareClientForReply(c,REPLY_INT);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("LPUSH");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("LPOP");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LPOP mylist\r\n");
prepareClientForReply(c,REPLY_BULK);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("LPOP");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("SADD");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"SADD myset 24\r\ncounter_rand000000000000\r\n");
prepareClientForReply(c,REPLY_RETCODE);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("SADD");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("SPOP");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"SPOP myset\r\n");
prepareClientForReply(c,REPLY_BULK);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("SPOP");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("LPUSH (again, in order to bench LRANGE)");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n");
prepareClientForReply(c,REPLY_RETCODE);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("LPUSH (again, in order to bench LRANGE)");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("LRANGE (first 100 elements)");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LRANGE mylist 0 99\r\n");
prepareClientForReply(c,REPLY_MBULK);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("LRANGE (first 100 elements)");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("LRANGE (first 300 elements)");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LRANGE mylist 0 299\r\n");
prepareClientForReply(c,REPLY_MBULK);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("LRANGE (first 300 elements)");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("LRANGE (first 450 elements)");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LRANGE mylist 0 449\r\n");
prepareClientForReply(c,REPLY_MBULK);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("LRANGE (first 450 elements)");
+ endBenchmark();
- prepareForBenchmark();
+ prepareForBenchmark("LRANGE (first 600 elements)");
c = createClient();
if (!c) exit(1);
c->obuf = sdscat(c->obuf,"LRANGE mylist 0 599\r\n");
prepareClientForReply(c,REPLY_MBULK);
createMissingClients(c);
aeMain(config.el);
- endBenchmark("LRANGE (first 600 elements)");
+ endBenchmark();
printf("\n");
} while(config.loop);
#include <stdlib.h>
#include <unistd.h>
#include <ctype.h>
+ #include <errno.h>
+ #include <sys/stat.h>
#include "anet.h"
#include "sds.h"
static struct config {
char *hostip;
int hostport;
+ char *hostsocket;
long repeat;
int dbnum;
- int argn_from_stdin;
int interactive;
int shutdown;
int monitor_mode;
int pubsub_mode;
- int raw_output;
+ int raw_output; /* output mode per command */
+ int tty; /* flag for default output format */
+ int stdinarg; /* get last arg from stdin. (-x option) */
+ char mb_sep;
char *auth;
char *historyfile;
} config;
static int cliReadReply(int fd);
static void usage();
- static int cliConnect(void) {
+ /* Connect to the client. If force is not zero the connection is performed
+ * even if there is already a connected socket. */
+ static int cliConnect(int force) {
char err[ANET_ERR_LEN];
static int fd = ANET_ERR;
- if (fd == ANET_ERR) {
+ if (fd == ANET_ERR || force) {
+ if (force) close(fd);
- fd = anetTcpConnect(err,config.hostip,config.hostport);
+ if (config.hostsocket == NULL) {
+ fd = anetTcpConnect(err,config.hostip,config.hostport);
+ } else {
+ fd = anetUnixConnect(err,config.hostsocket);
- if (fd == ANET_ERR) {
- fprintf(stderr, "Could not connect to Redis at %s: %s", config.hostsocket, err);
- return -1;
- }
+ }
if (fd == ANET_ERR) {
- fprintf(stderr, "Could not connect to Redis at %s:%d: %s", config.hostip, config.hostport, err);
+ fprintf(stderr,"Could not connect to Redis at ");
+ if (config.hostsocket == NULL)
+ fprintf(stderr,"%s:%d: %s",config.hostip,config.hostport,err);
+ else
+ fprintf(stderr,"%s: %s",config.hostsocket,err);
return -1;
}
anetTcpNoDelay(NULL,fd);
ssize_t ret;
ret = read(fd,&c,1);
- if (ret == -1) {
+ if (ret <= 0) {
sdsfree(line);
return NULL;
} else if ((ret == 0) || (c == '\n')) {
if (reply == NULL) return 1;
if (!quiet)
- printf("%s\n", reply);
+ printf("%s", reply);
sdsfree(reply);
return 0;
}
}
s++;
}
- printf("\"\n");
+ printf("\"");
}
static int cliReadBulkReply(int fd) {
reply = zmalloc(bulklen);
anetRead(fd,reply,bulklen);
anetRead(fd,crlf,2);
- if (config.raw_output || !isatty(fileno(stdout))) {
+ if (config.raw_output || !config.tty) {
if (bulklen && fwrite(reply,bulklen,1,stdout) == 0) {
zfree(reply);
return 1;
static int cliReadMultiBulkReply(int fd) {
sds replylen = cliReadLine(fd);
int elements, c = 1;
+ int retval = 0;
if (replylen == NULL) return 1;
elements = atoi(replylen);
printf("(empty list or set)\n");
}
while(elements--) {
- printf("%d. ", c);
- if (cliReadReply(fd)) return 1;
+ if (config.tty) printf("%d. ", c);
+ if (cliReadReply(fd)) retval = 1;
+ if (elements) printf("%c",config.mb_sep);
c++;
}
- return 0;
+ return retval;
}
static int cliReadReply(int fd) {
char type;
+ int nread;
- if (anetRead(fd,&type,1) <= 0) {
+ if ((nread = anetRead(fd,&type,1)) <= 0) {
if (config.shutdown) return 0;
- exit(1);
+ if (config.interactive &&
+ (nread == 0 || (nread == -1 && errno == ECONNRESET)))
+ {
+ return ECONNRESET;
+ } else {
+ printf("I/O error while reading from socket: %s",strerror(errno));
+ exit(1);
+ }
}
switch(type) {
case '-':
- printf("(error) ");
+ if (config.tty) printf("(error) ");
cliReadSingleLineReply(fd,0);
return 1;
case '+':
return cliReadSingleLineReply(fd,0);
case ':':
- printf("(integer) ");
+ if (config.tty) printf("(integer) ");
return cliReadSingleLineReply(fd,0);
case '$':
return cliReadBulkReply(fd);
case '*':
return cliReadMultiBulkReply(fd);
default:
- printf("protocol error, got '%c' as reply type byte\n", type);
+ printf("protocol error, got '%c' as reply type byte", type);
return 1;
}
}
return 0;
}
+ static void showInteractiveHelp(void) {
+ printf(
+ "\n"
+ "Welcome to redis-cli " REDIS_VERSION "!\n"
+ "Just type any valid Redis command to see a pretty printed output.\n"
+ "\n"
+ "It is possible to quote strings, like in:\n"
+ " set \"my key\" \"some string \\xff\\n\"\n"
+ "\n"
+ "You can find a list of valid Redis commands at\n"
+ " http://code.google.com/p/redis/wiki/CommandReference\n"
+ "\n"
+ "Note: redis-cli supports line editing, use up/down arrows for history."
+ "\n\n");
+ }
+
static int cliSendCommand(int argc, char **argv, int repeat) {
char *command = argv[0];
int fd, j, retval = 0;
sds cmd;
config.raw_output = !strcasecmp(command,"info");
+ if (!strcasecmp(command,"help")) {
+ showInteractiveHelp();
+ return 0;
+ }
if (!strcasecmp(command,"shutdown")) config.shutdown = 1;
if (!strcasecmp(command,"monitor")) config.monitor_mode = 1;
if (!strcasecmp(command,"subscribe") ||
!strcasecmp(command,"psubscribe")) config.pubsub_mode = 1;
- if ((fd = cliConnect()) == -1) return 1;
+ if ((fd = cliConnect(0)) == -1) return 1;
/* Select db number */
retval = selectDb(fd);
while(repeat--) {
anetWrite(fd,cmd,sdslen(cmd));
while (config.monitor_mode) {
- cliReadSingleLineReply(fd,0);
+ if (cliReadSingleLineReply(fd,0)) exit(1);
+ printf("\n");
}
if (config.pubsub_mode) {
printf("Reading messages... (press Ctrl-c to quit)\n");
while (1) {
cliReadReply(fd);
- printf("\n");
+ printf("\n\n");
}
}
retval = cliReadReply(fd);
- if (retval) {
- return retval;
- }
+ if (!config.raw_output && config.tty) printf("\n");
+ if (retval) return retval;
}
return 0;
}
i++;
} else if (!strcmp(argv[i],"-h") && lastarg) {
usage();
+ } else if (!strcmp(argv[i],"-x")) {
+ config.stdinarg = 1;
} else if (!strcmp(argv[i],"-p") && !lastarg) {
config.hostport = atoi(argv[i+1]);
i++;
+ } else if (!strcmp(argv[i],"-s") && !lastarg) {
+ config.hostsocket = argv[i+1];
+ i++;
} else if (!strcmp(argv[i],"-r") && !lastarg) {
config.repeat = strtoll(argv[i+1],NULL,10);
i++;
config.auth = argv[i+1];
i++;
} else if (!strcmp(argv[i],"-i")) {
- config.interactive = 1;
+ fprintf(stderr,
+ "Starting interactive mode using -i is deprecated. Interactive mode is started\n"
+ "by default when redis-cli is executed without a command to execute.\n"
+ );
} else if (!strcmp(argv[i],"-c")) {
- config.argn_from_stdin = 1;
+ fprintf(stderr,
+ "Reading last argument from standard input using -c is deprecated.\n"
+ "When standard input is connected to a pipe or regular file, it is\n"
+ "automatically used as last argument.\n"
+ );
} else if (!strcmp(argv[i],"-v")) {
- printf("redis-cli shipped with Redis verison %s\n", REDIS_VERSION);
+ printf("redis-cli shipped with Redis version %s\n", REDIS_VERSION);
exit(0);
} else {
break;
}
static void usage() {
- fprintf(stderr, "usage: redis-cli [-iv] [-h host] [-p port] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 arg3 ... argN\n");
+ fprintf(stderr, "usage: redis-cli [-iv] [-h host] [-p port] [-s /path/to/socket] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 arg3 ... argN\n");
- fprintf(stderr, "usage: echo \"argN\" | redis-cli -c [-h host] [-p port] [-s /path/to/socket] [-a authpw] [-r repeat_times] [-n db_num] cmd arg1 arg2 ... arg(N-1)\n");
- fprintf(stderr, "\nIf a pipe from standard input is detected this data is used as last argument.\n\n");
- fprintf(stderr, "example: cat /etc/passwd | redis-cli set my_passwd\n");
+ fprintf(stderr, "usage: echo \"argN\" | redis-cli -x [options] cmd arg1 arg2 ... arg(N-1)\n\n");
+ fprintf(stderr, "example: cat /etc/passwd | redis-cli -x set my_passwd\n");
fprintf(stderr, "example: redis-cli get my_passwd\n");
fprintf(stderr, "example: redis-cli -r 100 lpush mylist x\n");
fprintf(stderr, "\nRun in interactive mode: redis-cli -i or just don't pass any command\n");
return sds;
}
- static char **splitArguments(char *line, int *argc) {
- char *p = line;
- char *current = NULL;
- char **vector = NULL;
-
- *argc = 0;
- while(1) {
- /* skip blanks */
- while(*p && isspace(*p)) p++;
- if (*p) {
- /* get a token */
- int inq=0; /* set to 1 if we are in "quotes" */
- int done = 0;
-
- if (current == NULL) current = sdsempty();
- while(!done) {
- if (inq) {
- if (*p == '\\' && *(p+1)) {
- char c;
-
- p++;
- switch(*p) {
- case 'n': c = '\n'; break;
- case 'r': c = '\r'; break;
- case 't': c = '\t'; break;
- case 'b': c = '\b'; break;
- case 'a': c = '\a'; break;
- default: c = *p; break;
- }
- current = sdscatlen(current,&c,1);
- } else if (*p == '"') {
- done = 1;
- } else {
- current = sdscatlen(current,p,1);
- }
- } else {
- switch(*p) {
- case ' ':
- case '\n':
- case '\r':
- case '\t':
- case '\0':
- done=1;
- break;
- case '"':
- inq=1;
- break;
- default:
- current = sdscatlen(current,p,1);
- break;
- }
- }
- if (*p) p++;
- }
- /* add the token to the vector */
- vector = zrealloc(vector,((*argc)+1)*sizeof(char*));
- vector[*argc] = current;
- (*argc)++;
- current = NULL;
- } else {
- return vector;
- }
- }
- }
-
#define LINE_BUFLEN 4096
static void repl() {
int argc, j;
- char *line, **argv;
+ char *line;
+ sds *argv;
+ config.interactive = 1;
while((line = linenoise("redis> ")) != NULL) {
if (line[0] != '\0') {
- argv = splitArguments(line,&argc);
+ argv = sdssplitargs(line,&argc);
linenoiseHistoryAdd(line);
if (config.historyfile) linenoiseHistorySave(config.historyfile);
- if (argc > 0) {
+ if (argv == NULL) {
+ printf("Invalid argument(s)\n");
+ continue;
+ } else if (argc > 0) {
if (strcasecmp(argv[0],"quit") == 0 ||
strcasecmp(argv[0],"exit") == 0)
- exit(0);
- else
- cliSendCommand(argc, argv, 1);
+ {
+ exit(0);
+ } else {
+ int err;
+
+ if ((err = cliSendCommand(argc, argv, 1)) != 0) {
+ if (err == ECONNRESET) {
+ printf("Reconnecting... ");
+ fflush(stdout);
+ if (cliConnect(1) == -1) exit(1);
+ printf("OK\n");
+ cliSendCommand(argc,argv,1);
+ }
+ }
+ }
}
/* Free the argument vector */
for (j = 0; j < argc; j++)
exit(0);
}
+ static int noninteractive(int argc, char **argv) {
+ int retval = 0;
+ if (config.stdinarg) {
+ argv = zrealloc(argv, (argc+1)*sizeof(char*));
+ argv[argc] = readArgFromStdin();
+ retval = cliSendCommand(argc+1, argv, config.repeat);
+ } else {
+ /* stdin is probably a tty, can be tested with S_ISCHR(s.st_mode) */
+ retval = cliSendCommand(argc, argv, config.repeat);
+ }
+ return retval;
+ }
+
int main(int argc, char **argv) {
int firstarg;
- char **argvcopy;
config.hostip = "127.0.0.1";
config.hostport = 6379;
+ config.hostsocket = NULL;
config.repeat = 1;
config.dbnum = 0;
- config.argn_from_stdin = 0;
- config.shutdown = 0;
config.interactive = 0;
+ config.shutdown = 0;
config.monitor_mode = 0;
config.pubsub_mode = 0;
config.raw_output = 0;
+ config.stdinarg = 0;
config.auth = NULL;
config.historyfile = NULL;
+ config.tty = isatty(fileno(stdout)) || (getenv("FAKETTY") != NULL);
+ config.mb_sep = '\n';
if (getenv("HOME") != NULL) {
config.historyfile = malloc(256);
if (config.auth != NULL) {
char *authargv[2];
+ int dbnum = config.dbnum;
+ /* We need to save the real configured database number and set it to
+ * zero here, otherwise cliSendCommand() will try to perform the
+ * SELECT command before the authentication, and it will fail. */
+ config.dbnum = 0;
authargv[0] = "AUTH";
authargv[1] = config.auth;
cliSendCommand(2, convertToSds(2, authargv), 1);
+ config.dbnum = dbnum; /* restore the right DB number */
}
- if (argc == 0 || config.interactive == 1) repl();
-
- argvcopy = convertToSds(argc+1, argv);
- if (config.argn_from_stdin) {
- sds lastarg = readArgFromStdin();
- argvcopy[argc] = lastarg;
- argc++;
- }
- return cliSendCommand(argc, argvcopy, config.repeat);
+ /* Start interactive mode when no command is provided */
+ if (argc == 0) repl();
+ /* Otherwise, we have some arguments to execute */
+ return noninteractive(argc,convertToSds(argc,argv));
}
#include <float.h>
#include <math.h>
#include <pthread.h>
+ #include <sys/resource.h>
/* Our shared "common" objects */
{"info",infoCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{"monitor",monitorCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
+ {"persist",persistCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
{"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0},
{"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
{"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
NULL, /* val dup */
dictEncObjKeyCompare, /* key compare */
dictRedisObjectDestructor, /* key destructor */
- dictVanillaFree /* val destructor of malloc(sizeof(double)) */
+ NULL /* val destructor */
};
/* Db->dict, keys are sds strings, vals are Redis objects. */
/* ======================= Cron: called every 100 ms ======================== */
+ /* Try to expire a few timed out keys. The algorithm used is adaptive and
+ * will use few CPU cycles if there are few expiring keys, otherwise
+ * it will get more aggressive to avoid that too much memory is used by
+ * keys that can be removed from the keyspace. */
+ void activeExpireCycle(void) {
+ int j;
+
+ for (j = 0; j < server.dbnum; j++) {
+ int expired;
+ redisDb *db = server.db+j;
+
+ /* Continue to expire if at the end of the cycle more than 25%
+ * of the keys were expired. */
+ do {
+ long num = dictSize(db->expires);
+ time_t now = time(NULL);
+
+ expired = 0;
+ if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
+ num = REDIS_EXPIRELOOKUPS_PER_CRON;
+ while (num--) {
+ dictEntry *de;
+ time_t t;
+
+ if ((de = dictGetRandomKey(db->expires)) == NULL) break;
+ t = (time_t) dictGetEntryVal(de);
+ if (now > t) {
+ sds key = dictGetEntryKey(de);
+ robj *keyobj = createStringObject(key,sdslen(key));
+
+ propagateExpire(db,keyobj);
+ dbDelete(db,keyobj);
+ decrRefCount(keyobj);
+ expired++;
+ server.stat_expiredkeys++;
+ }
+ }
+ } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
+ }
+ }
+
+
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j, loops = server.cronloops++;
REDIS_NOTUSED(eventLoop);
}
}
- /* Try to expire a few timed out keys. The algorithm used is adaptive and
- * will use few CPU cycles if there are few expiring keys, otherwise
- * it will get more aggressive to avoid that too much memory is used by
- * keys that can be removed from the keyspace. */
- for (j = 0; j < server.dbnum; j++) {
- int expired;
- redisDb *db = server.db+j;
-
- /* Continue to expire if at the end of the cycle more than 25%
- * of the keys were expired. */
- do {
- long num = dictSize(db->expires);
- time_t now = time(NULL);
-
- expired = 0;
- if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
- num = REDIS_EXPIRELOOKUPS_PER_CRON;
- while (num--) {
- dictEntry *de;
- time_t t;
-
- if ((de = dictGetRandomKey(db->expires)) == NULL) break;
- t = (time_t) dictGetEntryVal(de);
- if (now > t) {
- sds key = dictGetEntryKey(de);
- robj *keyobj = createStringObject(key,sdslen(key));
-
- dbDelete(db,keyobj);
- decrRefCount(keyobj);
- expired++;
- server.stat_expiredkeys++;
- }
- }
- } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
- }
+ /* Expire a few keys per cycle, only if this is a master.
+ * On slaves we wait for DEL operations synthesized by the master
+ * in order to guarantee a strict consistency. */
+ if (server.masterhost == NULL) activeExpireCycle();
/* Swap a few keys on disk if we are over the memory limit and VM
* is enbled. Try to free objects from the free list first. */
}
void initServerConfig() {
- server.dbnum = REDIS_DEFAULT_DBNUM;
server.port = REDIS_SERVERPORT;
+ server.bindaddr = NULL;
+ server.unixsocket = NULL;
+ server.ipfd = -1;
+ server.sofd = -1;
+ server.dbnum = REDIS_DEFAULT_DBNUM;
server.verbosity = REDIS_VERBOSE;
server.maxidletime = REDIS_MAXIDLETIME;
server.saveparams = NULL;
server.logfile = NULL; /* NULL = log on standard output */
- server.bindaddr = NULL;
server.glueoutputbuf = 1;
server.daemonize = 0;
server.appendonly = 0;
server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
+ server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
server.shutdown_asap = 0;
resetServerSaveParams();
createSharedObjects();
server.el = aeCreateEventLoop();
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
- server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
- if (server.fd == -1) {
- redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
+ server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);
+ if (server.ipfd == ANET_ERR) {
+ redisLog(REDIS_WARNING, "Opening port: %s", server.neterr);
+ exit(1);
+ }
+ if (server.unixsocket != NULL) {
+ unlink(server.unixsocket); /* don't care if this fails */
+ server.sofd = anetUnixServer(server.neterr,server.unixsocket);
+ if (server.sofd == ANET_ERR) {
+ redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);
+ exit(1);
+ }
+ }
+ if (server.ipfd < 0 && server.sofd < 0) {
+ redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");
exit(1);
}
for (j = 0; j < server.dbnum; j++) {
server.stat_starttime = time(NULL);
server.unixtime = time(NULL);
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
- if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
- acceptHandler, NULL) == AE_ERR) oom("creating file event");
+ if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
+ acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
+ if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
+ acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");
if (server.appendonly) {
server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
int processCommand(redisClient *c) {
struct redisCommand *cmd;
- /* Free some memory if needed (maxmemory setting) */
- if (server.maxmemory) freeMemoryIfNeeded();
-
/* Handle the multi bulk command type. This is an alternative protocol
* supported by Redis in order to receive commands that are composed of
* multiple binary-safe "bulk" arguments. The latency of processing is
} else if (c->multibulk) {
if (c->bulklen == -1) {
if (((char*)c->argv[0]->ptr)[0] != '$') {
- addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
+ addReplyError(c,"multi bulk protocol error");
resetClient(c);
return 1;
} else {
- int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
+ char *eptr;
+ long bulklen = strtol(((char*)c->argv[0]->ptr)+1,&eptr,10);
+ int perr = eptr[0] != '\0';
+
decrRefCount(c->argv[0]);
- if (bulklen < 0 || bulklen > 1024*1024*1024) {
+ if (perr || bulklen == LONG_MIN || bulklen == LONG_MAX ||
+ bulklen < 0 || bulklen > 1024*1024*1024)
+ {
c->argc--;
- addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
+ addReplyError(c,"invalid bulk write count");
resetClient(c);
return 1;
}
* such wrong arity, bad command name and so forth. */
cmd = lookupCommand(c->argv[0]->ptr);
if (!cmd) {
- addReplySds(c,
- sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
- (char*)c->argv[0]->ptr));
+ addReplyErrorFormat(c,"unknown command '%s'",
+ (char*)c->argv[0]->ptr);
resetClient(c);
return 1;
} else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
(c->argc < -cmd->arity)) {
- addReplySds(c,
- sdscatprintf(sdsempty(),
- "-ERR wrong number of arguments for '%s' command\r\n",
- cmd->name));
+ addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
+ cmd->name);
resetClient(c);
return 1;
} else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
/* This is a bulk command, we have to read the last argument yet. */
- int bulklen = atoi(c->argv[c->argc-1]->ptr);
+ char *eptr;
+ long bulklen = strtol(c->argv[c->argc-1]->ptr,&eptr,10);
+ int perr = eptr[0] != '\0';
decrRefCount(c->argv[c->argc-1]);
- if (bulklen < 0 || bulklen > 1024*1024*1024) {
+ if (perr || bulklen == LONG_MAX || bulklen == LONG_MIN ||
+ bulklen < 0 || bulklen > 1024*1024*1024)
+ {
c->argc--;
- addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
+ addReplyError(c,"invalid bulk write count");
resetClient(c);
return 1;
}
/* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
- addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
+ addReplyError(c,"operation not permitted");
resetClient(c);
return 1;
}
- /* Handle the maxmemory directive */
+ /* Handle the maxmemory directive.
+ *
+ * First we try to free some memory if possible (if there are volatile
+ * keys in the dataset). If there are not the only thing we can do
+ * is returning an error. */
+ if (server.maxmemory) freeMemoryIfNeeded();
if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) &&
zmalloc_used_memory() > server.maxmemory)
{
- addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
+ addReplyError(c,"command not allowed when used memory > 'maxmemory'");
resetClient(c);
return 1;
}
&&
cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand &&
cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) {
- addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n"));
+ addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
resetClient(c);
return 1;
}
if (server.vm_enabled) unlink(server.vm_swap_file);
} else {
/* Snapshotting. Perform a SYNC SAVE and exit */
- if (rdbSave(server.dbfilename) == REDIS_OK) {
- if (server.daemonize)
- unlink(server.pidfile);
- redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
- } else {
+ if (rdbSave(server.dbfilename) != REDIS_OK) {
/* Ooops.. error saving! The best we can do is to continue
* operating. Note that if there was a background saving process,
* in the next cron() Redis will be notified that the background
return REDIS_ERR;
}
}
+ if (server.daemonize) unlink(server.pidfile);
redisLog(REDIS_WARNING,"Server exit now, bye bye...");
return REDIS_OK;
}
addReply(c,shared.ok);
} else {
c->authenticated = 0;
- addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
+ addReplyError(c,"invalid password");
}
}
time_t uptime = time(NULL)-server.stat_starttime;
int j;
char hmem[64];
+ struct rusage self_ru, c_ru;
+
+ getrusage(RUSAGE_SELF, &self_ru);
+ getrusage(RUSAGE_CHILDREN, &c_ru);
bytesToHuman(hmem,zmalloc_used_memory());
info = sdscatprintf(sdsempty(),
"process_id:%ld\r\n"
"uptime_in_seconds:%ld\r\n"
"uptime_in_days:%ld\r\n"
+ "used_cpu_sys:%.2f\r\n"
+ "used_cpu_user:%.2f\r\n"
+ "used_cpu_sys_childrens:%.2f\r\n"
+ "used_cpu_user_childrens:%.2f\r\n"
"connected_clients:%d\r\n"
"connected_slaves:%d\r\n"
"blocked_clients:%d\r\n"
"used_memory:%zu\r\n"
"used_memory_human:%s\r\n"
+ "mem_fragmentation_ratio:%.2f\r\n"
"changes_since_last_save:%lld\r\n"
"bgsave_in_progress:%d\r\n"
"last_save_time:%ld\r\n"
(long) getpid(),
uptime,
uptime/(3600*24),
+ (float)self_ru.ru_utime.tv_sec+(float)self_ru.ru_utime.tv_usec/1000000,
+ (float)self_ru.ru_stime.tv_sec+(float)self_ru.ru_stime.tv_usec/1000000,
+ (float)c_ru.ru_utime.tv_sec+(float)c_ru.ru_utime.tv_usec/1000000,
+ (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
server.blpop_blocked_clients,
zmalloc_used_memory(),
hmem,
+ zmalloc_get_fragmentation_ratio(),
server.dirty,
server.bgsavechildpid != -1,
server.lastsave,
if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
for (j = 0; j < server.dbnum; j++) {
int minttl = -1;
- robj *minkey = NULL;
+ sds minkey = NULL;
+ robj *keyobj = NULL;
struct dictEntry *de;
if (dictSize(server.db[j].expires)) {
minttl = t;
}
}
- dbDelete(server.db+j,minkey);
+ keyobj = createStringObject(minkey,sdslen(minkey));
+ dbDelete(server.db+j,keyobj);
+ server.stat_expiredkeys++;
+ decrRefCount(keyobj);
}
}
if (!freed) return; /* nothing to free... */
}
#endif /* __linux__ */
+ void createPidFile(void) {
+ /* Try to write the pid file in a best-effort way. */
+ FILE *fp = fopen(server.pidfile,"w");
+ if (fp) {
+ fprintf(fp,"%d\n",getpid());
+ fclose(fp);
+ }
+ }
+
void daemonize(void) {
int fd;
- FILE *fp;
if (fork() != 0) exit(0); /* parent exits */
setsid(); /* create a new session */
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
- /* Try to write the pid file */
- fp = fopen(server.pidfile,"w");
- if (fp) {
- fprintf(fp,"%d\n",getpid());
- fclose(fp);
- }
}
void version() {
}
if (server.daemonize) daemonize();
initServer();
+ if (server.daemonize) createPidFile();
redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
#ifdef __linux__
linuxOvercommitMemoryWarning();
if (rdbLoad(server.dbfilename) == REDIS_OK)
redisLog(REDIS_NOTICE,"DB loaded from disk: %ld seconds",time(NULL)-start);
}
- redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+ if (server.ipfd > 0)
+ redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+ if (server.sofd > 0)
+ redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
redisLog(REDIS_WARNING,"%s", messages[i]);
/* free(messages); Don't call free() with possibly corrupted memory. */
+ if (server.daemonize) unlink(server.pidfile);
_exit(0);
}
#include "anet.h" /* Networking the easy way */
#include "zipmap.h" /* Compact string -> string data structure */
#include "ziplist.h" /* Compact list data structure */
+ #include "intset.h" /* Compact integer set structure */
#include "version.h"
/* Error codes */
#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
#define REDIS_SHARED_INTEGERS 10000
+ #define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
#define REDIS_WRITEV_THRESHOLD 3
#define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
+ #define REDIS_ENCODING_INTSET 6 /* Encoded as intset */
/* Object types only used for dumping to disk */
#define REDIS_EXPIRETIME 253
#define REDIS_HASH_MAX_ZIPMAP_VALUE 512
#define REDIS_LIST_MAX_ZIPLIST_ENTRIES 1024
#define REDIS_LIST_MAX_ZIPLIST_VALUE 32
+ #define REDIS_SET_MAX_INTSET_ENTRIES 4096
/* Sets operations codes */
#define REDIS_OP_UNION 0
int dictid;
sds querybuf;
robj **argv, **mbargv;
+ char *newline; /* pointing to the detected newline in querybuf */
int argc, mbargc;
- int bulklen; /* bulk read len. -1 if not in bulk read mode */
+ long bulklen; /* bulk read len. -1 if not in bulk read mode */
int multibulk; /* multi bulk command format active */
list *reply;
int sentlen;
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
+
+ /* Response buffer */
+ int bufpos;
+ char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;
struct saveparam {
struct redisServer {
pthread_t mainthread;
int port;
- int fd;
+ char *bindaddr;
+ char *unixsocket;
+ int ipfd;
+ int sofd;
redisDb *db;
long long dirty; /* changes to DB from the last save */
+ long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */
list *clients;
list *slaves, *monitors;
char neterr[ANET_ERR_LEN];
struct saveparam *saveparams;
int saveparamslen;
char *logfile;
- char *bindaddr;
char *dbfilename;
char *appendfilename;
char *requirepass;
size_t hash_max_zipmap_value;
size_t list_max_ziplist_entries;
size_t list_max_ziplist_value;
+ size_t set_max_intset_entries;
/* Virtual memory state */
FILE *vm_fp;
int vm_fd;
} redisSortOperation;
/* ZSETs use a specialized version of Skiplists */
-
typedef struct zskiplistNode {
- struct zskiplistNode **forward;
- struct zskiplistNode *backward;
- unsigned int *span;
- double score;
robj *obj;
+ double score;
+ struct zskiplistNode *backward;
+ struct zskiplistLevel {
+ struct zskiplistNode *forward;
+ unsigned int span;
+ } level[];
} zskiplistNode;
typedef struct zskiplist {
listNode *ln; /* Entry in linked list */
} listTypeEntry;
+ /* Structure to hold set iteration abstraction. */
+ typedef struct {
+ robj *subject;
+ int encoding;
+ int ii; /* intset iterator */
+ dictIterator *di;
+ } setTypeIterator;
+
/* Structure to hold hash iteration abstration. Note that iteration over
* hashes involves both fields and values. Because it is possible that
* not both are required, store pointers in the iterator to avoid
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
void addReply(redisClient *c, robj *obj);
+ void *addDeferredMultiBulkLength(redisClient *c);
+ void setDeferredMultiBulkLength(redisClient *c, void *node, long length);
void addReplySds(redisClient *c, sds s);
void processInputBuffer(redisClient *c);
-void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
+void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
+void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
void addReplyBulk(redisClient *c, robj *obj);
void addReplyBulkCString(redisClient *c, char *s);
void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void addReply(redisClient *c, robj *obj);
void addReplySds(redisClient *c, sds s);
+ void addReplyError(redisClient *c, char *err);
+ void addReplyStatus(redisClient *c, char *status);
void addReplyDouble(redisClient *c, double d);
void addReplyLongLong(redisClient *c, long long ll);
- void addReplyUlong(redisClient *c, unsigned long ul);
+ void addReplyMultiBulkLen(redisClient *c, long length);
void *dupClientReplyValue(void *o);
+ #ifdef __GNUC__
+ void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
+ __attribute__((format(printf, 2, 3)));
+ void addReplyStatusFormat(redisClient *c, const char *fmt, ...)
+ __attribute__((format(printf, 2, 3)));
+ #else
+ void addReplyErrorFormat(redisClient *c, const char *fmt, ...);
+ void addReplyStatusFormat(redisClient *c, const char *fmt, ...);
+ #endif
+
/* List data type */
void listTypeTryConversion(robj *subject, robj *value);
void listTypePush(robj *subject, robj *value, int where);
robj *createListObject(void);
robj *createZiplistObject(void);
robj *createSetObject(void);
+ robj *createIntsetObject(void);
robj *createHashObject(void);
robj *createZsetObject(void);
int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg);
/* Sorted sets data type */
zskiplist *zslCreate(void);
void zslFree(zskiplist *zsl);
- void zslInsert(zskiplist *zsl, double score, robj *obj);
+ zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
/* Core functions */
void freeMemoryIfNeeded(void);
void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
vmpointer *vmSwapObjectBlocking(robj *val);
+ /* Set data type */
+ robj *setTypeCreate(robj *value);
+ int setTypeAdd(robj *subject, robj *value);
+ int setTypeRemove(robj *subject, robj *value);
+ int setTypeIsMember(robj *subject, robj *value);
+ setTypeIterator *setTypeInitIterator(robj *subject);
+ void setTypeReleaseIterator(setTypeIterator *si);
+ robj *setTypeNext(setTypeIterator *si);
+ robj *setTypeRandomElement(robj *subject);
+ unsigned long setTypeSize(robj *subject);
+ void setTypeConvert(robj *subject, int enc);
+
/* Hash data type */
void convertToRealHash(robj *o);
void hashTypeTryConversion(robj *subject, robj **argv, int start, int end);
long long memtoll(const char *p, int *err);
int ll2string(char *s, size_t len, long long value);
int isStringRepresentableAsLong(sds s, long *longval);
+ int isStringRepresentableAsLongLong(sds s, long long *longval);
+ int isObjectRepresentableAsLongLong(robj *o, long long *llongval);
/* Configuration */
void loadServerConfig(char *filename);
/* db.c -- Keyspace access API */
int removeExpire(redisDb *db, robj *key);
+ void propagateExpire(redisDb *db, robj *key);
int expireIfNeeded(redisDb *db, robj *key);
- int deleteIfVolatile(redisDb *db, robj *key);
time_t getExpire(redisDb *db, robj *key);
- int setExpire(redisDb *db, robj *key, time_t when);
+ void setExpire(redisDb *db, robj *key, time_t when);
robj *lookupKey(redisDb *db, robj *key);
robj *lookupKeyRead(redisDb *db, robj *key);
robj *lookupKeyWrite(redisDb *db, robj *key);
void expireatCommand(redisClient *c);
void getsetCommand(redisClient *c);
void ttlCommand(redisClient *c);
+ void persistCommand(redisClient *c);
void slaveofCommand(redisClient *c);
void debugCommand(redisClient *c);
void msetCommand(redisClient *c);