#include <fcntl.h>
#include <sys/time.h>
#include <sys/resource.h>
+#include <sys/uio.h>
#include <limits.h>
#include <math.h>
#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
#define REDIS_MAX_WRITE_PER_EVENT (1024*64)
-#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
+#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
+
+/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
+#define REDIS_WRITEV_THRESHOLD 3
+/* Max number of iovecs used for each writev call */
+#define REDIS_WRITEV_IOVEC_COUNT 256
/* Hash table parameters */
#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
/* Sort operations */
#define REDIS_SORT_GET 0
-#define REDIS_SORT_DEL 1
-#define REDIS_SORT_INCR 2
-#define REDIS_SORT_DECR 3
-#define REDIS_SORT_ASC 4
-#define REDIS_SORT_DESC 5
+#define REDIS_SORT_ASC 1
+#define REDIS_SORT_DESC 2
#define REDIS_SORTKEY_MAX 1024
/* Log levels */
int shareobjects;
/* Replication related */
int isslave;
+ char *masterauth;
char *masterhost;
int masterport;
redisClient *master; /* client that is master for this slave */
static zskiplist *zslCreate(void);
static void zslFree(zskiplist *zsl);
static void zslInsert(zskiplist *zsl, double score, robj *obj);
+static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
static void authCommand(redisClient *c);
static void pingCommand(redisClient *c);
static void flushallCommand(redisClient *c);
static void sortCommand(redisClient *c);
static void lremCommand(redisClient *c);
+static void rpoplpushcommand(redisClient *c);
static void infoCommand(redisClient *c);
static void mgetCommand(redisClient *c);
static void monitorCommand(redisClient *c);
{"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
{"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
{"lrem",lremCommand,4,REDIS_CMD_BULK},
+ {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
{"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
{"srem",sremCommand,3,REDIS_CMD_BULK},
{"smove",smoveCommand,4,REDIS_CMD_BULK},
{"debug",debugCommand,-2,REDIS_CMD_INLINE},
{NULL,NULL,0,0}
};
+
/*============================ Utility functions ============================ */
/* Glob-style pattern matching. */
/* Check if a background saving in progress terminated */
if (server.bgsaveinprogress) {
int statloc;
- if (wait4(-1,&statloc,WNOHANG,NULL)) {
+ if (wait3(&statloc,WNOHANG,NULL)) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = WIFSIGNALED(statloc);
}
}
- /* Try to expire a few timed out keys */
+ /* Try to expire a few timed out keys. The algorithm used is adaptive and
+ * will use few CPU cycles if there are few expiring keys, otherwise
+ * it will get more aggressive to avoid that too much memory is used by
+ * keys that can be removed from the keyspace. */
for (j = 0; j < server.dbnum; j++) {
+ int expired;
redisDb *db = server.db+j;
- int num = dictSize(db->expires);
- if (num) {
+ /* Continue to expire if at the end of the cycle more than 25%
+ * of the keys were expired. */
+ do {
+ int num = dictSize(db->expires);
time_t now = time(NULL);
+ expired = 0;
if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
num = REDIS_EXPIRELOOKUPS_PER_CRON;
while (num--) {
t = (time_t) dictGetEntryVal(de);
if (now > t) {
deleteKey(db,dictGetEntryKey(de));
+ expired++;
}
}
- }
+ } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
}
/* Check if we should connect to a MASTER */
server.saveparamslen++;
}
-static void ResetServerSaveParams() {
+static void resetServerSaveParams() {
zfree(server.saveparams);
server.saveparams = NULL;
server.saveparamslen = 0;
server.sharingpoolsize = 1024;
server.maxclients = 0;
server.maxmemory = 0;
- ResetServerSaveParams();
+ resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
/* Replication related */
server.isslave = 0;
+ server.masterauth = NULL;
server.masterhost = NULL;
server.masterport = 6379;
server.master = NULL;
server.stat_numcommands = 0;
server.stat_numconnections = 0;
server.stat_starttime = time(NULL);
- aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
+ aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
if (server.appendonly) {
server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
server.masterhost = sdsnew(argv[1]);
server.masterport = atoi(argv[2]);
server.replstate = REDIS_REPL_CONNECT;
+ } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
+ server.masterauth = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
- if (strcasecmp(argv[1],"no")) {
+ if (!strcasecmp(argv[1],"no")) {
server.appendfsync = APPENDFSYNC_NO;
- } else if (strcasecmp(argv[1],"always")) {
+ } else if (!strcasecmp(argv[1],"always")) {
server.appendfsync = APPENDFSYNC_ALWAYS;
- } else if (strcasecmp(argv[1],"everysec")) {
+ } else if (!strcasecmp(argv[1],"everysec")) {
server.appendfsync = APPENDFSYNC_EVERYSEC;
} else {
err = "argument must be 'no', 'always' or 'everysec'";
zfree(c);
}
+#define GLUEREPLY_UP_TO (1024)
static void glueReplyBuffersIfNeeded(redisClient *c) {
- int totlen = 0;
+ int copylen = 0;
+ char buf[GLUEREPLY_UP_TO];
listNode *ln;
robj *o;
listRewind(c->reply);
while((ln = listYield(c->reply))) {
+ int objlen;
+
o = ln->value;
- totlen += sdslen(o->ptr);
- /* This optimization makes more sense if we don't have to copy
- * too much data */
- if (totlen > 1024) return;
- }
- if (totlen > 0) {
- char buf[1024];
- int copylen = 0;
-
- listRewind(c->reply);
- while((ln = listYield(c->reply))) {
- o = ln->value;
- memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
- copylen += sdslen(o->ptr);
+ objlen = sdslen(o->ptr);
+ if (copylen + objlen <= GLUEREPLY_UP_TO) {
+ memcpy(buf+copylen,o->ptr,objlen);
+ copylen += objlen;
listDelNode(c->reply,ln);
+ } else {
+ if (copylen == 0) return;
+ break;
}
- /* Now the output buffer is empty, add the new single element */
- o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
- listAddNodeTail(c->reply,o);
}
+ /* Now the output buffer is empty, add the new single element */
+ o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
+ listAddNodeHead(c->reply,o);
}
static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
- if (server.glueoutputbuf && listLength(c->reply) > 1)
- glueReplyBuffersIfNeeded(c);
+
+ /* Use writev() if we have enough buffers to send */
+ if (!server.glueoutputbuf &&
+ listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
+ !(c->flags & REDIS_MASTER))
+ {
+ sendReplyToClientWritev(el, fd, privdata, mask);
+ return;
+ }
+
while(listLength(c->reply)) {
+ if (server.glueoutputbuf && listLength(c->reply) > 1)
+ glueReplyBuffersIfNeeded(c);
+
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr);
c->sentlen = 0;
}
/* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
- * bytes, in a single threaded server it's a good idea to server
+ * bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
- * terms think to 'KEYS *' against the loopback interfae) */
+ * scenario think about 'KEYS *' against the loopback interfae) */
if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
}
if (nwritten == -1) {
}
}
+static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
+{
+ redisClient *c = privdata;
+ int nwritten = 0, totwritten = 0, objlen, willwrite;
+ robj *o;
+ struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
+ int offset, ion = 0;
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(mask);
+
+ listNode *node;
+ while (listLength(c->reply)) {
+ offset = c->sentlen;
+ ion = 0;
+ willwrite = 0;
+
+ /* fill-in the iov[] array */
+ for(node = listFirst(c->reply); node; node = listNextNode(node)) {
+ o = listNodeValue(node);
+ objlen = sdslen(o->ptr);
+
+ if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
+ break;
+
+ if(ion == REDIS_WRITEV_IOVEC_COUNT)
+ break; /* no more iovecs */
+
+ iov[ion].iov_base = ((char*)o->ptr) + offset;
+ iov[ion].iov_len = objlen - offset;
+ willwrite += objlen - offset;
+ offset = 0; /* just for the first item */
+ ion++;
+ }
+
+ if(willwrite == 0)
+ break;
+
+ /* write all collected blocks at once */
+ if((nwritten = writev(fd, iov, ion)) < 0) {
+ if (errno != EAGAIN) {
+ redisLog(REDIS_DEBUG,
+ "Error writing to client: %s", strerror(errno));
+ freeClient(c);
+ return;
+ }
+ break;
+ }
+
+ totwritten += nwritten;
+ offset = c->sentlen;
+
+ /* remove written robjs from c->reply */
+ while (nwritten && listLength(c->reply)) {
+ o = listNodeValue(listFirst(c->reply));
+ objlen = sdslen(o->ptr);
+
+ if(nwritten >= objlen - offset) {
+ listDelNode(c->reply, listFirst(c->reply));
+ nwritten -= objlen - offset;
+ c->sentlen = 0;
+ } else {
+ /* partial write */
+ c->sentlen += nwritten;
+ break;
+ }
+ offset = 0;
+ }
+ }
+
+ if (totwritten > 0)
+ c->lastinteraction = time(NULL);
+
+ if (listLength(c->reply) == 0) {
+ c->sentlen = 0;
+ aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
+ }
+}
+
static struct redisCommand *lookupCommand(char *name) {
int j = 0;
while(cmdTable[j].name != NULL) {
/* Exec the command */
dirty = server.dirty;
cmd->proc(c);
- if (server.appendonly != 0)
+ if (server.appendonly && server.dirty-dirty)
feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
- if (server.dirty-dirty != 0 && listLength(server.slaves))
+ if (server.dirty-dirty && listLength(server.slaves))
replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
if (listLength(server.monitors))
replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
if (outv != static_outv) zfree(outv);
}
-/* TODO: translate EXPIREs into EXPIRETOs */
-static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
- sds buf = sdsempty();
- int j;
- ssize_t nwritten;
- time_t now;
- robj *tmpargv[3];
-
- /* The DB this command was targetting is not the same as the last command
- * we appendend. To issue a SELECT command is needed. */
- if (dictid != server.appendseldb) {
- char seldb[64];
-
- snprintf(seldb,sizeof(seldb),"%d",dictid);
- buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
- strlen(seldb),seldb);
- server.appendseldb = dictid;
- }
-
- /* "Fix" the argv vector if the command is EXPIRE. We want to translate
- * EXPIREs into EXPIREATs calls */
- if (cmd->proc == expireCommand) {
- long when;
-
- tmpargv[0] = createStringObject("EXPIREAT",8);
- tmpargv[1] = argv[1];
- incrRefCount(argv[1]);
- when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
- tmpargv[2] = createObject(REDIS_STRING,
- sdscatprintf(sdsempty(),"%ld",when));
- argv = tmpargv;
- }
-
- /* Append the actual command */
- buf = sdscatprintf(buf,"*%d\r\n",argc);
- for (j = 0; j < argc; j++) {
- robj *o = argv[j];
-
- if (o->encoding != REDIS_ENCODING_RAW)
- o = getDecodedObject(o);
- buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
- buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
- buf = sdscatlen(buf,"\r\n",2);
- if (o != argv[j])
- decrRefCount(o);
- }
-
- /* Free the objects from the modified argv for EXPIREAT */
- if (cmd->proc == expireCommand) {
- for (j = 0; j < 3; j++)
- decrRefCount(argv[j]);
- }
-
- /* We want to perform a single write. This should be guaranteed atomic
- * at least if the filesystem we are writing is a real physical one.
- * While this will save us against the server being killed I don't think
- * there is much to do about the whole server stopping for power problems
- * or alike */
- nwritten = write(server.appendfd,buf,sdslen(buf));
- if (nwritten != (signed)sdslen(buf)) {
- /* Ooops, we are in troubles. The best thing to do for now is
- * to simply exit instead to give the illusion that everything is
- * working as expected. */
- if (nwritten == -1) {
- redisLog(REDIS_WARNING,"Aborting on error writing to the append-only file: %s",strerror(errno));
- } else {
- redisLog(REDIS_WARNING,"Aborting on short write while writing to the append-only file: %s",strerror(errno));
- }
- abort();
- }
- now = time(NULL);
- if (server.appendfsync == APPENDFSYNC_ALWAYS ||
- (server.appendfsync == APPENDFSYNC_EVERYSEC &&
- now-server.lastfsync > 1))
- {
- fsync(server.appendfd); /* Let's try to get this data on the disk */
- server.lastfsync = now;
- }
-}
-
static void processInputBuffer(redisClient *c) {
again:
if (c->bulklen == -1) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
- (void) write(c->fd,err,strlen(err));
+ if (write(c->fd,err,strlen(err)) == -1) {
+ /* Nothing to do, Just to avoid the warning... */
+ }
freeClient(c);
return;
}
len = 1;
buf[0] = (val < 0) ? 255 : 254;
} else {
- snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
+ snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
buf[0] = strlen((char*)buf);
len = buf[0]+1;
}
eoferr: /* unexpected end of file is handled here with a fatal exit */
if (keyobj) decrRefCount(keyobj);
- redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
+ redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
exit(1);
return REDIS_ERR; /* Just to avoid warning */
}
case REDIS_STRING: type = "+string"; break;
case REDIS_LIST: type = "+list"; break;
case REDIS_SET: type = "+set"; break;
+ case REDIS_ZSET: type = "+zset"; break;
default: type = "unknown"; break;
}
}
}
}
+/* This is the semantic of this command:
+ * RPOPLPUSH srclist dstlist:
+ * IF LLEN(srclist) > 0
+ * element = RPOP srclist
+ * LPUSH dstlist element
+ * RETURN element
+ * ELSE
+ * RETURN nil
+ * END
+ * END
+ *
+ * The idea is to be able to get an element from a list in a reliable way
+ * since the element is not just returned but pushed against another list
+ * as well. This command was originally proposed by Ezra Zygmuntowicz.
+ */
+static void rpoplpushcommand(redisClient *c) {
+ robj *sobj;
+
+ sobj = lookupKeyWrite(c->db,c->argv[1]);
+ if (sobj == NULL) {
+ addReply(c,shared.nullbulk);
+ } else {
+ if (sobj->type != REDIS_LIST) {
+ addReply(c,shared.wrongtypeerr);
+ } else {
+ list *srclist = sobj->ptr;
+ listNode *ln = listLast(srclist);
+
+ if (ln == NULL) {
+ addReply(c,shared.nullbulk);
+ } else {
+ robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
+ robj *ele = listNodeValue(ln);
+ list *dstlist;
+
+ if (dobj == NULL) {
+
+ /* Create the list if the key does not exist */
+ dobj = createListObject();
+ dictAdd(c->db->dict,c->argv[2],dobj);
+ incrRefCount(c->argv[2]);
+ } else if (dobj->type != REDIS_LIST) {
+ addReply(c,shared.wrongtypeerr);
+ return;
+ }
+ /* Add the element to the target list */
+ dstlist = dobj->ptr;
+ listAddNodeHead(dstlist,ele);
+ incrRefCount(ele);
+
+ /* Send the element to the client as reply as well */
+ addReplyBulkLen(c,ele);
+ addReply(c,ele);
+ addReply(c,shared.crlf);
+
+ /* Finally remove the element from the source list */
+ listDelNode(srclist,ln);
+ server.dirty++;
+ }
+ }
+ }
+}
+
+
/* ==================================== Sets ================================ */
static void saddCommand(redisClient *c) {
o = lookupKeyRead(c->db,c->argv[1]);
if (o == NULL) {
- addReply(c,shared.czero);
+ addReply(c,shared.nullbulk);
return;
} else {
if (o->type != REDIS_ZSET) {
char buf[128];
double *score = dictGetEntryVal(de);
- snprintf(buf,sizeof(buf),"%.16g",*score);
+ snprintf(buf,sizeof(buf),"%.17g",*score);
addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
strlen(buf),buf));
}
int limit_start = 0, limit_count = -1, start, end;
int j, dontsort = 0, vectorlen;
int getop = 0; /* GET operation counter */
- robj *sortval, *sortby = NULL;
+ robj *sortval, *sortby = NULL, *storekey = NULL;
redisSortObject *vector; /* Resulting vector to sort */
/* Lookup the key to sort. It must be of the right types */
limit_start = atoi(c->argv[j+1]->ptr);
limit_count = atoi(c->argv[j+2]->ptr);
j+=2;
+ } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
+ storekey = c->argv[j+1];
+ j++;
} else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
sortby = c->argv[j+1];
/* If the BY pattern does not contain '*', i.e. it is constant,
REDIS_SORT_GET,c->argv[j+1]));
getop++;
j++;
- } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
- listAddNodeTail(operations,createSortOperation(
- REDIS_SORT_DEL,c->argv[j+1]));
- j++;
- } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
- listAddNodeTail(operations,createSortOperation(
- REDIS_SORT_INCR,c->argv[j+1]));
- j++;
- } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
- listAddNodeTail(operations,createSortOperation(
- REDIS_SORT_DECR,c->argv[j+1]));
- j++;
} else {
decrRefCount(sortval);
listRelease(operations);
/* Send command output to the output buffer, performing the specified
* GET/DEL/INCR/DECR operations if any. */
outputlen = getop ? getop*(end-start+1) : end-start+1;
- addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
- for (j = start; j <= end; j++) {
- listNode *ln;
- if (!getop) {
- addReplyBulkLen(c,vector[j].obj);
- addReply(c,vector[j].obj);
- addReply(c,shared.crlf);
+ if (storekey == NULL) {
+ /* STORE option not specified, sent the sorting result to client */
+ addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
+ for (j = start; j <= end; j++) {
+ listNode *ln;
+ if (!getop) {
+ addReplyBulkLen(c,vector[j].obj);
+ addReply(c,vector[j].obj);
+ addReply(c,shared.crlf);
+ }
+ listRewind(operations);
+ while((ln = listYield(operations))) {
+ redisSortOperation *sop = ln->value;
+ robj *val = lookupKeyByPattern(c->db,sop->pattern,
+ vector[j].obj);
+
+ if (sop->type == REDIS_SORT_GET) {
+ if (!val || val->type != REDIS_STRING) {
+ addReply(c,shared.nullbulk);
+ } else {
+ addReplyBulkLen(c,val);
+ addReply(c,val);
+ addReply(c,shared.crlf);
+ }
+ } else {
+ assert(sop->type == REDIS_SORT_GET); /* always fails */
+ }
+ }
}
- listRewind(operations);
- while((ln = listYield(operations))) {
- redisSortOperation *sop = ln->value;
- robj *val = lookupKeyByPattern(c->db,sop->pattern,
- vector[j].obj);
+ } else {
+ robj *listObject = createListObject();
+ list *listPtr = (list*) listObject->ptr;
- if (sop->type == REDIS_SORT_GET) {
- if (!val || val->type != REDIS_STRING) {
- addReply(c,shared.nullbulk);
+ /* STORE option specified, set the sorting result as a List object */
+ for (j = start; j <= end; j++) {
+ listNode *ln;
+ if (!getop) {
+ listAddNodeTail(listPtr,vector[j].obj);
+ incrRefCount(vector[j].obj);
+ }
+ listRewind(operations);
+ while((ln = listYield(operations))) {
+ redisSortOperation *sop = ln->value;
+ robj *val = lookupKeyByPattern(c->db,sop->pattern,
+ vector[j].obj);
+
+ if (sop->type == REDIS_SORT_GET) {
+ if (!val || val->type != REDIS_STRING) {
+ listAddNodeTail(listPtr,createStringObject("",0));
+ } else {
+ listAddNodeTail(listPtr,val);
+ incrRefCount(val);
+ }
} else {
- addReplyBulkLen(c,val);
- addReply(c,val);
- addReply(c,shared.crlf);
+ assert(sop->type == REDIS_SORT_GET); /* always fails */
}
- } else if (sop->type == REDIS_SORT_DEL) {
- /* TODO */
}
}
+ if (dictReplace(c->db->dict,storekey,listObject)) {
+ incrRefCount(storekey);
+ }
+ /* Note: we add 1 because the DB is dirty anyway since even if the
+ * SORT result is empty a new key is set and maybe the old content
+ * replaced. */
+ server.dirty += 1+outputlen;
+ addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
}
/* Cleanup */
}
static int syncWithMaster(void) {
- char buf[1024], tmpfile[256];
+ char buf[1024], tmpfile[256], authcmd[1024];
int dumpsize;
int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
int dfd;
strerror(errno));
return REDIS_ERR;
}
+
+ /* AUTH with the master if required. */
+ if(server.masterauth) {
+ snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
+ if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
+ close(fd);
+ redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
+ strerror(errno));
+ return REDIS_ERR;
+ }
+ /* Read the AUTH result. */
+ if (syncReadLine(fd,buf,1024,3600) == -1) {
+ close(fd);
+ redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
+ strerror(errno));
+ return REDIS_ERR;
+ }
+ if (buf[0] != '+') {
+ close(fd);
+ redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
+ return REDIS_ERR;
+ }
+ }
+
/* Issue the SYNC command */
if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
close(fd);
}
}
+/* ============================== Append Only file ========================== */
+
+static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
+ sds buf = sdsempty();
+ int j;
+ ssize_t nwritten;
+ time_t now;
+ robj *tmpargv[3];
+
+ /* The DB this command was targetting is not the same as the last command
+ * we appendend. To issue a SELECT command is needed. */
+ if (dictid != server.appendseldb) {
+ char seldb[64];
+
+ snprintf(seldb,sizeof(seldb),"%d",dictid);
+ buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
+ strlen(seldb),seldb);
+ server.appendseldb = dictid;
+ }
+
+ /* "Fix" the argv vector if the command is EXPIRE. We want to translate
+ * EXPIREs into EXPIREATs calls */
+ if (cmd->proc == expireCommand) {
+ long when;
+
+ tmpargv[0] = createStringObject("EXPIREAT",8);
+ tmpargv[1] = argv[1];
+ incrRefCount(argv[1]);
+ when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
+ tmpargv[2] = createObject(REDIS_STRING,
+ sdscatprintf(sdsempty(),"%ld",when));
+ argv = tmpargv;
+ }
+
+ /* Append the actual command */
+ buf = sdscatprintf(buf,"*%d\r\n",argc);
+ for (j = 0; j < argc; j++) {
+ robj *o = argv[j];
+
+ if (o->encoding != REDIS_ENCODING_RAW)
+ o = getDecodedObject(o);
+ buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
+ buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
+ buf = sdscatlen(buf,"\r\n",2);
+ if (o != argv[j])
+ decrRefCount(o);
+ }
+
+ /* Free the objects from the modified argv for EXPIREAT */
+ if (cmd->proc == expireCommand) {
+ for (j = 0; j < 3; j++)
+ decrRefCount(argv[j]);
+ }
+
+ /* We want to perform a single write. This should be guaranteed atomic
+ * at least if the filesystem we are writing is a real physical one.
+ * While this will save us against the server being killed I don't think
+ * there is much to do about the whole server stopping for power problems
+ * or alike */
+ nwritten = write(server.appendfd,buf,sdslen(buf));
+ if (nwritten != (signed)sdslen(buf)) {
+ /* Ooops, we are in troubles. The best thing to do for now is
+ * to simply exit instead to give the illusion that everything is
+ * working as expected. */
+ if (nwritten == -1) {
+ redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
+ } else {
+ redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
+ }
+ exit(1);
+ }
+ now = time(NULL);
+ if (server.appendfsync == APPENDFSYNC_ALWAYS ||
+ (server.appendfsync == APPENDFSYNC_EVERYSEC &&
+ now-server.lastfsync > 1))
+ {
+ fsync(server.appendfd); /* Let's try to get this data on the disk */
+ server.lastfsync = now;
+ }
+}
+
+/* In Redis commands are always executed in the context of a client, so in
+ * order to load the append only file we need to create a fake client. */
+static struct redisClient *createFakeClient(void) {
+ struct redisClient *c = zmalloc(sizeof(*c));
+
+ selectDb(c,0);
+ c->fd = -1;
+ c->querybuf = sdsempty();
+ c->argc = 0;
+ c->argv = NULL;
+ c->flags = 0;
+ /* We set the fake client as a slave waiting for the synchronization
+ * so that Redis will not try to send replies to this client. */
+ c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
+ c->reply = listCreate();
+ listSetFreeMethod(c->reply,decrRefCount);
+ listSetDupMethod(c->reply,dupClientReplyValue);
+ return c;
+}
+
+static void freeFakeClient(struct redisClient *c) {
+ sdsfree(c->querybuf);
+ listRelease(c->reply);
+ zfree(c);
+}
+
+/* Replay the append log file. On error REDIS_OK is returned. On non fatal
+ * error (the append only file is zero-length) REDIS_ERR is returned. On
+ * fatal error an error message is logged and the program exists. */
+int loadAppendOnlyFile(char *filename) {
+ struct redisClient *fakeClient;
+ FILE *fp = fopen(filename,"r");
+ struct redis_stat sb;
+
+ if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
+ return REDIS_ERR;
+
+ if (fp == NULL) {
+ redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
+ exit(1);
+ }
+
+ fakeClient = createFakeClient();
+ while(1) {
+ int argc, j;
+ unsigned long len;
+ robj **argv;
+ char buf[128];
+ sds argsds;
+ struct redisCommand *cmd;
+
+ if (fgets(buf,sizeof(buf),fp) == NULL) {
+ if (feof(fp))
+ break;
+ else
+ goto readerr;
+ }
+ if (buf[0] != '*') goto fmterr;
+ argc = atoi(buf+1);
+ argv = zmalloc(sizeof(robj*)*argc);
+ for (j = 0; j < argc; j++) {
+ if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
+ if (buf[0] != '$') goto fmterr;
+ len = strtol(buf+1,NULL,10);
+ argsds = sdsnewlen(NULL,len);
+ if (fread(argsds,len,1,fp) == 0) goto fmterr;
+ argv[j] = createObject(REDIS_STRING,argsds);
+ if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
+ }
+
+ /* Command lookup */
+ cmd = lookupCommand(argv[0]->ptr);
+ if (!cmd) {
+ redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
+ exit(1);
+ }
+ /* Try object sharing and encoding */
+ if (server.shareobjects) {
+ int j;
+ for(j = 1; j < argc; j++)
+ argv[j] = tryObjectSharing(argv[j]);
+ }
+ if (cmd->flags & REDIS_CMD_BULK)
+ tryObjectEncoding(argv[argc-1]);
+ /* Run the command in the context of a fake client */
+ fakeClient->argc = argc;
+ fakeClient->argv = argv;
+ cmd->proc(fakeClient);
+ /* Discard the reply objects list from the fake client */
+ while(listLength(fakeClient->reply))
+ listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
+ /* Clean up, ready for the next command */
+ for (j = 0; j < argc; j++) decrRefCount(argv[j]);
+ zfree(argv);
+ }
+ fclose(fp);
+ freeFakeClient(fakeClient);
+ return REDIS_OK;
+
+readerr:
+ if (feof(fp)) {
+ redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
+ } else {
+ redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
+ }
+ exit(1);
+fmterr:
+ redisLog(REDIS_WARNING,"Bad file format reading the append only file");
+ exit(1);
+}
+
/* ================================= Debugging ============================== */
static void debugCommand(redisClient *c) {
}
}
-#ifdef HAVE_BACKTRACE
-static struct redisFunctionSym symsTable[] = {
-{"compareStringObjects", (unsigned long)compareStringObjects},
-{"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
-{"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
-{"dictEncObjHash", (unsigned long)dictEncObjHash},
-{"incrDecrCommand", (unsigned long)incrDecrCommand},
-{"freeStringObject", (unsigned long)freeStringObject},
-{"freeListObject", (unsigned long)freeListObject},
-{"freeSetObject", (unsigned long)freeSetObject},
-{"decrRefCount", (unsigned long)decrRefCount},
-{"createObject", (unsigned long)createObject},
-{"freeClient", (unsigned long)freeClient},
-{"rdbLoad", (unsigned long)rdbLoad},
-{"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
-{"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
-{"addReply", (unsigned long)addReply},
-{"addReplySds", (unsigned long)addReplySds},
-{"incrRefCount", (unsigned long)incrRefCount},
-{"rdbSaveBackground", (unsigned long)rdbSaveBackground},
-{"createStringObject", (unsigned long)createStringObject},
-{"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
-{"syncWithMaster", (unsigned long)syncWithMaster},
-{"tryObjectSharing", (unsigned long)tryObjectSharing},
-{"tryObjectEncoding", (unsigned long)tryObjectEncoding},
-{"getDecodedObject", (unsigned long)getDecodedObject},
-{"removeExpire", (unsigned long)removeExpire},
-{"expireIfNeeded", (unsigned long)expireIfNeeded},
-{"deleteIfVolatile", (unsigned long)deleteIfVolatile},
-{"deleteKey", (unsigned long)deleteKey},
-{"getExpire", (unsigned long)getExpire},
-{"setExpire", (unsigned long)setExpire},
-{"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
-{"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
-{"authCommand", (unsigned long)authCommand},
-{"pingCommand", (unsigned long)pingCommand},
-{"echoCommand", (unsigned long)echoCommand},
-{"setCommand", (unsigned long)setCommand},
-{"setnxCommand", (unsigned long)setnxCommand},
-{"getCommand", (unsigned long)getCommand},
-{"delCommand", (unsigned long)delCommand},
-{"existsCommand", (unsigned long)existsCommand},
-{"incrCommand", (unsigned long)incrCommand},
-{"decrCommand", (unsigned long)decrCommand},
-{"incrbyCommand", (unsigned long)incrbyCommand},
-{"decrbyCommand", (unsigned long)decrbyCommand},
-{"selectCommand", (unsigned long)selectCommand},
-{"randomkeyCommand", (unsigned long)randomkeyCommand},
-{"keysCommand", (unsigned long)keysCommand},
-{"dbsizeCommand", (unsigned long)dbsizeCommand},
-{"lastsaveCommand", (unsigned long)lastsaveCommand},
-{"saveCommand", (unsigned long)saveCommand},
-{"bgsaveCommand", (unsigned long)bgsaveCommand},
-{"shutdownCommand", (unsigned long)shutdownCommand},
-{"moveCommand", (unsigned long)moveCommand},
-{"renameCommand", (unsigned long)renameCommand},
-{"renamenxCommand", (unsigned long)renamenxCommand},
-{"lpushCommand", (unsigned long)lpushCommand},
-{"rpushCommand", (unsigned long)rpushCommand},
-{"lpopCommand", (unsigned long)lpopCommand},
-{"rpopCommand", (unsigned long)rpopCommand},
-{"llenCommand", (unsigned long)llenCommand},
-{"lindexCommand", (unsigned long)lindexCommand},
-{"lrangeCommand", (unsigned long)lrangeCommand},
-{"ltrimCommand", (unsigned long)ltrimCommand},
-{"typeCommand", (unsigned long)typeCommand},
-{"lsetCommand", (unsigned long)lsetCommand},
-{"saddCommand", (unsigned long)saddCommand},
-{"sremCommand", (unsigned long)sremCommand},
-{"smoveCommand", (unsigned long)smoveCommand},
-{"sismemberCommand", (unsigned long)sismemberCommand},
-{"scardCommand", (unsigned long)scardCommand},
-{"spopCommand", (unsigned long)spopCommand},
-{"srandmemberCommand", (unsigned long)srandmemberCommand},
-{"sinterCommand", (unsigned long)sinterCommand},
-{"sinterstoreCommand", (unsigned long)sinterstoreCommand},
-{"sunionCommand", (unsigned long)sunionCommand},
-{"sunionstoreCommand", (unsigned long)sunionstoreCommand},
-{"sdiffCommand", (unsigned long)sdiffCommand},
-{"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
-{"syncCommand", (unsigned long)syncCommand},
-{"flushdbCommand", (unsigned long)flushdbCommand},
-{"flushallCommand", (unsigned long)flushallCommand},
-{"sortCommand", (unsigned long)sortCommand},
-{"lremCommand", (unsigned long)lremCommand},
-{"infoCommand", (unsigned long)infoCommand},
-{"mgetCommand", (unsigned long)mgetCommand},
-{"monitorCommand", (unsigned long)monitorCommand},
-{"expireCommand", (unsigned long)expireCommand},
-{"expireatCommand", (unsigned long)expireatCommand},
-{"getsetCommand", (unsigned long)getsetCommand},
-{"ttlCommand", (unsigned long)ttlCommand},
-{"slaveofCommand", (unsigned long)slaveofCommand},
-{"debugCommand", (unsigned long)debugCommand},
-{"processCommand", (unsigned long)processCommand},
-{"setupSigSegvAction", (unsigned long)setupSigSegvAction},
-{"readQueryFromClient", (unsigned long)readQueryFromClient},
-{"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
-{"msetGenericCommand", (unsigned long)msetGenericCommand},
-{"msetCommand", (unsigned long)msetCommand},
-{"msetnxCommand", (unsigned long)msetnxCommand},
-{"zslCreateNode", (unsigned long)zslCreateNode},
-{"zslCreate", (unsigned long)zslCreate},
-{"zslFreeNode",(unsigned long)zslFreeNode},
-{"zslFree",(unsigned long)zslFree},
-{"zslRandomLevel",(unsigned long)zslRandomLevel},
-{"zslInsert",(unsigned long)zslInsert},
-{"zslDelete",(unsigned long)zslDelete},
-{"createZsetObject",(unsigned long)createZsetObject},
-{"zaddCommand",(unsigned long)zaddCommand},
-{"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
-{"zrangeCommand",(unsigned long)zrangeCommand},
-{"zrevrangeCommand",(unsigned long)zrevrangeCommand},
-{"zremCommand",(unsigned long)zremCommand},
-{"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
-{"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
-{"feedAppendOnlyFile",(unsigned long)feedAppendOnlyFile},
-{NULL,0}
-};
+/* =================================== Main! ================================ */
-/* This function try to convert a pointer into a function name. It's used in
- * oreder to provide a backtrace under segmentation fault that's able to
- * display functions declared as static (otherwise the backtrace is useless). */
-static char *findFuncName(void *pointer, unsigned long *offset){
- int i, ret = -1;
- unsigned long off, minoff = 0;
+#ifdef __linux__
+int linuxOvercommitMemoryValue(void) {
+ FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
+ char buf[64];
- /* Try to match against the Symbol with the smallest offset */
- for (i=0; symsTable[i].pointer; i++) {
- unsigned long lp = (unsigned long) pointer;
+ if (!fp) return -1;
+ if (fgets(buf,64,fp) == NULL) {
+ fclose(fp);
+ return -1;
+ }
+ fclose(fp);
- if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
- off=lp-symsTable[i].pointer;
- if (ret < 0 || off < minoff) {
- minoff=off;
- ret=i;
- }
- }
+ return atoi(buf);
+}
+
+void linuxOvercommitMemoryWarning(void) {
+ if (linuxOvercommitMemoryValue() == 0) {
+ redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
}
- if (ret == -1) return NULL;
- *offset = minoff;
- return symsTable[ret].name;
+}
+#endif /* __linux__ */
+
+static void daemonize(void) {
+ int fd;
+ FILE *fp;
+
+ if (fork() != 0) exit(0); /* parent exits */
+ setsid(); /* create a new session */
+
+ /* Every output goes to /dev/null. If Redis is daemonized but
+ * the 'logfile' is set to 'stdout' in the configuration file
+ * it will not log at all. */
+ if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
+ dup2(fd, STDIN_FILENO);
+ dup2(fd, STDOUT_FILENO);
+ dup2(fd, STDERR_FILENO);
+ if (fd > STDERR_FILENO) close(fd);
+ }
+ /* Try to write the pid file */
+ fp = fopen(server.pidfile,"w");
+ if (fp) {
+ fprintf(fp,"%d\n",getpid());
+ fclose(fp);
+ }
+}
+
+int main(int argc, char **argv) {
+ initServerConfig();
+ if (argc == 2) {
+ resetServerSaveParams();
+ loadServerConfig(argv[1]);
+ } else if (argc > 2) {
+ fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
+ exit(1);
+ } else {
+ redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
+ }
+ initServer();
+ if (server.daemonize) daemonize();
+ redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
+#ifdef __linux__
+ linuxOvercommitMemoryWarning();
+#endif
+ if (server.appendonly) {
+ if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
+ redisLog(REDIS_NOTICE,"DB loaded from append only file");
+ } else {
+ if (rdbLoad(server.dbfilename) == REDIS_OK)
+ redisLog(REDIS_NOTICE,"DB loaded from disk");
+ }
+ if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
+ acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
+ redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+ aeMain(server.el);
+ aeDeleteEventLoop(server.el);
+ return 0;
}
+/* ============================= Backtrace support ========================= */
+
+#ifdef HAVE_BACKTRACE
+static char *findFuncName(void *pointer, unsigned long *offset);
+
static void *getMcontextEip(ucontext_t *uc) {
#if defined(__FreeBSD__)
return (void*) uc->uc_mcontext.mc_eip;
sigaction (SIGBUS, &act, NULL);
return;
}
-#else /* HAVE_BACKTRACE */
-static void setupSigSegvAction(void) {
-}
-#endif /* HAVE_BACKTRACE */
-/* =================================== Main! ================================ */
+#include "staticsymbols.h"
+/* This function try to convert a pointer into a function name. It's used in
+ * oreder to provide a backtrace under segmentation fault that's able to
+ * display functions declared as static (otherwise the backtrace is useless). */
+static char *findFuncName(void *pointer, unsigned long *offset){
+ int i, ret = -1;
+ unsigned long off, minoff = 0;
-#ifdef __linux__
-int linuxOvercommitMemoryValue(void) {
- FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
- char buf[64];
+ /* Try to match against the Symbol with the smallest offset */
+ for (i=0; symsTable[i].pointer; i++) {
+ unsigned long lp = (unsigned long) pointer;
- if (!fp) return -1;
- if (fgets(buf,64,fp) == NULL) {
- fclose(fp);
- return -1;
+ if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
+ off=lp-symsTable[i].pointer;
+ if (ret < 0 || off < minoff) {
+ minoff=off;
+ ret=i;
+ }
+ }
}
- fclose(fp);
-
- return atoi(buf);
+ if (ret == -1) return NULL;
+ *offset = minoff;
+ return symsTable[ret].name;
}
-
-void linuxOvercommitMemoryWarning(void) {
- if (linuxOvercommitMemoryValue() == 0) {
- redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
- }
+#else /* HAVE_BACKTRACE */
+static void setupSigSegvAction(void) {
}
-#endif /* __linux__ */
+#endif /* HAVE_BACKTRACE */
-static void daemonize(void) {
- int fd;
- FILE *fp;
- if (fork() != 0) exit(0); /* parent exits */
- setsid(); /* create a new session */
- /* Every output goes to /dev/null. If Redis is daemonized but
- * the 'logfile' is set to 'stdout' in the configuration file
- * it will not log at all. */
- if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
- dup2(fd, STDIN_FILENO);
- dup2(fd, STDOUT_FILENO);
- dup2(fd, STDERR_FILENO);
- if (fd > STDERR_FILENO) close(fd);
- }
- /* Try to write the pid file */
- fp = fopen(server.pidfile,"w");
- if (fp) {
- fprintf(fp,"%d\n",getpid());
- fclose(fp);
- }
-}
+/* The End */
+
+
-int main(int argc, char **argv) {
- initServerConfig();
- if (argc == 2) {
- ResetServerSaveParams();
- loadServerConfig(argv[1]);
- } else if (argc > 2) {
- fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
- exit(1);
- } else {
- redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
- }
- initServer();
- if (server.daemonize) daemonize();
- redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
-#ifdef __linux__
- linuxOvercommitMemoryWarning();
-#endif
- if (rdbLoad(server.dbfilename) == REDIS_OK)
- redisLog(REDIS_NOTICE,"DB loaded from disk");
- if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
- acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
- redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
- aeMain(server.el);
- aeDeleteEventLoop(server.el);
- return 0;
-}