X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/6c446631dac7a7fe9e34b18392d4a8498c8c9bc3..7c49733ce3f550a96f60a9213911fdc9265cedc8:/redis.c diff --git a/redis.c b/redis.c index 9221dc0d..61bb81cf 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "1.06" +#define REDIS_VERSION "1.1.91" #include "fmacros.h" #include "config.h" @@ -198,6 +198,10 @@ #define APPENDFSYNC_ALWAYS 1 #define APPENDFSYNC_EVERYSEC 2 +/* We can print the stacktrace, so our assert is defined this way: */ +#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1))) +static void _redisAssert(char *estr); + /*================================= Data types ============================== */ /* A redis object, that is a type able to hold a string / list / set */ @@ -209,6 +213,17 @@ typedef struct redisObject { int refcount; } robj; +/* Macro used to initalize a Redis object allocated on the stack. + * Note that this macro is taken near the structure definition to make sure + * we'll update it when the structure is changed, to avoid bugs like + * bug #85 introduced exactly in this way. */ +#define initStaticStringObject(_var,_ptr) do { \ + _var.refcount = 1; \ + _var.type = REDIS_STRING; \ + _var.encoding = REDIS_ENCODING_RAW; \ + _var.ptr = _ptr; \ +} while(0); + typedef struct redisDb { dict *dict; dict *expires; @@ -275,8 +290,9 @@ struct redisServer { int appendfd; int appendseldb; char *pidfile; - int bgsaveinprogress; pid_t bgsavechildpid; + pid_t bgrewritechildpid; + sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ struct saveparam *saveparams; int saveparamslen; char *logfile; @@ -383,7 +399,7 @@ static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv static int syncWithMaster(void); static robj *tryObjectSharing(robj *o); static int tryObjectEncoding(robj *o); -static robj *getDecodedObject(const robj *o); +static robj *getDecodedObject(robj *o); static int removeExpire(redisDb *db, robj *key); static int expireIfNeeded(redisDb *db, robj *key); static int deleteIfVolatile(redisDb *db, robj *key); @@ -395,6 +411,7 @@ static void freeMemoryIfNeeded(void); static int processCommand(redisClient *c); static void setupSigSegvAction(void); static void rdbRemoveTempFile(pid_t childpid); +static void aofRemoveTempFile(pid_t childpid); static size_t stringObjectLen(robj *o); static void processInputBuffer(redisClient *c); static zskiplist *zslCreate(void); @@ -421,6 +438,7 @@ static void dbsizeCommand(redisClient *c); static void lastsaveCommand(redisClient *c); static void saveCommand(redisClient *c); static void bgsaveCommand(redisClient *c); +static void bgrewriteaofCommand(redisClient *c); static void shutdownCommand(redisClient *c); static void moveCommand(redisClient *c); static void renameCommand(redisClient *c); @@ -498,7 +516,7 @@ static struct redisCommand cmdTable[] = { {"lrange",lrangeCommand,4,REDIS_CMD_INLINE}, {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE}, {"lrem",lremCommand,4,REDIS_CMD_BULK}, - {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK}, + {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"srem",sremCommand,3,REDIS_CMD_BULK}, {"smove",smoveCommand,4,REDIS_CMD_BULK}, @@ -518,7 +536,7 @@ static struct redisCommand cmdTable[] = { {"zrem",zremCommand,3,REDIS_CMD_BULK}, {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE}, {"zrange",zrangeCommand,4,REDIS_CMD_INLINE}, - {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE}, + {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE}, {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE}, {"zcard",zcardCommand,2,REDIS_CMD_INLINE}, {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, @@ -541,6 +559,7 @@ static struct redisCommand cmdTable[] = { {"echo",echoCommand,2,REDIS_CMD_BULK}, {"save",saveCommand,1,REDIS_CMD_INLINE}, {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE}, + {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE}, {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE}, {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE}, {"type",typeCommand,2,REDIS_CMD_INLINE}, @@ -752,37 +771,24 @@ static unsigned int dictObjHash(const void *key) { static int dictEncObjKeyCompare(void *privdata, const void *key1, const void *key2) { - const robj *o1 = key1, *o2 = key2; - - if (o1->encoding == REDIS_ENCODING_RAW && - o2->encoding == REDIS_ENCODING_RAW) - return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr); - else { - robj *dec1, *dec2; - int cmp; + robj *o1 = (robj*) key1, *o2 = (robj*) key2; + int cmp; - dec1 = o1->encoding != REDIS_ENCODING_RAW ? - getDecodedObject(o1) : (robj*)o1; - dec2 = o2->encoding != REDIS_ENCODING_RAW ? - getDecodedObject(o2) : (robj*)o2; - cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr); - if (dec1 != o1) decrRefCount(dec1); - if (dec2 != o2) decrRefCount(dec2); - return cmp; - } + o1 = getDecodedObject(o1); + o2 = getDecodedObject(o2); + cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr); + decrRefCount(o1); + decrRefCount(o2); + return cmp; } static unsigned int dictEncObjHash(const void *key) { - const robj *o = key; + robj *o = (robj*) key; - if (o->encoding == REDIS_ENCODING_RAW) - return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); - else { - robj *dec = getDecodedObject(o); - unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr)); - decrRefCount(dec); - return hash; - } + o = getDecodedObject(o); + unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); + decrRefCount(o); + return hash; } static dictType setDictType = { @@ -800,7 +806,7 @@ static dictType zsetDictType = { NULL, /* val dup */ dictEncObjKeyCompare, /* key compare */ dictRedisObjectDestructor, /* key destructor */ - dictVanillaFree /* val destructor */ + dictVanillaFree /* val destructor of malloc(sizeof(double)) */ }; static dictType hashDictType = { @@ -820,8 +826,7 @@ static dictType hashDictType = { * is based on heap allocation for send buffers, so we simply abort. * At least the code will be simpler to read... */ static void oom(const char *msg) { - fprintf(stderr, "%s: Out of memory\n",msg); - fflush(stderr); + redisLog(REDIS_WARNING, "%s: Out of memory\n",msg); sleep(1); abort(); } @@ -869,6 +874,90 @@ static void tryResizeHashTables(void) { } } +/* A background saving child (BGSAVE) terminated its work. Handle this. */ +void backgroundSaveDoneHandler(int statloc) { + int exitcode = WEXITSTATUS(statloc); + int bysignal = WIFSIGNALED(statloc); + + if (!bysignal && exitcode == 0) { + redisLog(REDIS_NOTICE, + "Background saving terminated with success"); + server.dirty = 0; + server.lastsave = time(NULL); + } else if (!bysignal && exitcode != 0) { + redisLog(REDIS_WARNING, "Background saving error"); + } else { + redisLog(REDIS_WARNING, + "Background saving terminated by signal"); + rdbRemoveTempFile(server.bgsavechildpid); + } + server.bgsavechildpid = -1; + /* Possibly there are slaves waiting for a BGSAVE in order to be served + * (the first stage of SYNC is a bulk transfer of dump.rdb) */ + updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR); +} + +/* A background append only file rewriting (BGREWRITEAOF) terminated its work. + * Handle this. */ +void backgroundRewriteDoneHandler(int statloc) { + int exitcode = WEXITSTATUS(statloc); + int bysignal = WIFSIGNALED(statloc); + + if (!bysignal && exitcode == 0) { + int fd; + char tmpfile[256]; + + redisLog(REDIS_NOTICE, + "Background append only file rewriting terminated with success"); + /* Now it's time to flush the differences accumulated by the parent */ + snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid); + fd = open(tmpfile,O_WRONLY|O_APPEND); + if (fd == -1) { + redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno)); + goto cleanup; + } + /* Flush our data... */ + if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) != + (signed) sdslen(server.bgrewritebuf)) { + redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno)); + close(fd); + goto cleanup; + } + redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf)); + /* Now our work is to rename the temp file into the stable file. And + * switch the file descriptor used by the server for append only. */ + if (rename(tmpfile,server.appendfilename) == -1) { + redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno)); + close(fd); + goto cleanup; + } + /* Mission completed... almost */ + redisLog(REDIS_NOTICE,"Append only file successfully rewritten."); + if (server.appendfd != -1) { + /* If append only is actually enabled... */ + close(server.appendfd); + server.appendfd = fd; + fsync(fd); + server.appendseldb = -1; /* Make sure it will issue SELECT */ + redisLog(REDIS_NOTICE,"The new append only file was selected for future appends."); + } else { + /* If append only is disabled we just generate a dump in this + * format. Why not? */ + close(fd); + } + } else if (!bysignal && exitcode != 0) { + redisLog(REDIS_WARNING, "Background append only file rewriting error"); + } else { + redisLog(REDIS_WARNING, + "Background append only file rewriting terminated by signal"); + } +cleanup: + sdsfree(server.bgrewritebuf); + server.bgrewritebuf = sdsempty(); + aofRemoveTempFile(server.bgrewritechildpid); + server.bgrewritechildpid = -1; +} + static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j, loops = server.cronloops++; REDIS_NOTUSED(eventLoop); @@ -897,7 +986,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD * if we resize the HT while there is the saving child at work actually * a lot of memory movements in the parent will cause a lot of pages * copied. */ - if (!server.bgsaveinprogress) tryResizeHashTables(); + if (server.bgsavechildpid == -1) tryResizeHashTables(); /* Show information about connected clients */ if (!(loops % 5)) { @@ -912,28 +1001,17 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD if (server.maxidletime && !(loops % 10)) closeTimedoutClients(); - /* Check if a background saving in progress terminated */ - if (server.bgsaveinprogress) { + /* Check if a background saving or AOF rewrite in progress terminated */ + if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) { int statloc; - if (wait3(&statloc,WNOHANG,NULL)) { - int exitcode = WEXITSTATUS(statloc); - int bysignal = WIFSIGNALED(statloc); - - if (!bysignal && exitcode == 0) { - redisLog(REDIS_NOTICE, - "Background saving terminated with success"); - server.dirty = 0; - server.lastsave = time(NULL); - } else if (!bysignal && exitcode != 0) { - redisLog(REDIS_WARNING, "Background saving error"); + pid_t pid; + + if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { + if (pid == server.bgsavechildpid) { + backgroundSaveDoneHandler(statloc); } else { - redisLog(REDIS_WARNING, - "Background saving terminated by signal"); - rdbRemoveTempFile(server.bgsavechildpid); + backgroundRewriteDoneHandler(statloc); } - server.bgsaveinprogress = 0; - server.bgsavechildpid = -1; - updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR); } } else { /* If there is not a background saving in progress check if @@ -1060,7 +1138,7 @@ static void initServerConfig() { server.appendseldb = -1; /* Make sure the first time will not match */ server.pidfile = "/var/run/redis.pid"; server.dbfilename = "dump.rdb"; - server.appendfilename = "appendonly.log"; + server.appendfilename = "appendonly.aof"; server.requirepass = NULL; server.shareobjects = 0; server.sharingpoolsize = 1024; @@ -1112,8 +1190,9 @@ static void initServer() { server.db[j].id = j; } server.cronloops = 0; - server.bgsaveinprogress = 0; server.bgsavechildpid = -1; + server.bgrewritechildpid = -1; + server.bgrewritebuf = sdsempty(); server.lastsave = time(NULL); server.dirty = 0; server.usedmemory = 0; @@ -1332,14 +1411,14 @@ static void freeClient(redisClient *c) { freeClientArgv(c); close(c->fd); ln = listSearchKey(server.clients,c); - assert(ln != NULL); + redisAssert(ln != NULL); listDelNode(server.clients,ln); if (c->flags & REDIS_SLAVE) { if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) close(c->repldbfd); list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves; ln = listSearchKey(l,c); - assert(ln != NULL); + redisAssert(ln != NULL); listDelNode(l,ln); } if (c->flags & REDIS_MASTER) { @@ -1634,7 +1713,10 @@ static int processCommand(redisClient *c) { return 1; } else if ((cmd->arity > 0 && cmd->arity != c->argc) || (c->argc < -cmd->arity)) { - addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n")); + addReplySds(c, + sdscatprintf(sdsempty(), + "-ERR wrong number of arguments for '%s' command\r\n", + cmd->name)); resetClient(c); return 1; } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) { @@ -1722,8 +1804,8 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di robj *lenobj; lenobj = createObject(REDIS_STRING, - sdscatprintf(sdsempty(),"%d\r\n", - stringObjectLen(argv[j]))); + sdscatprintf(sdsempty(),"%lu\r\n", + (unsigned long) stringObjectLen(argv[j]))); lenobj->refcount = 0; outv[outc++] = lenobj; } @@ -1798,6 +1880,7 @@ again: if (sdslen(query) == 0) { /* Ignore empty query */ sdsfree(query); + if (sdslen(c->querybuf)) goto again; return; } argv = sdssplitlen(query,sdslen(query)," ",1,&argc); @@ -1815,10 +1898,16 @@ again: } } zfree(argv); - /* Execute the command. If the client is still valid - * after processCommand() return and there is something - * on the query buffer try to process the next command. */ - if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again; + if (c->argc) { + /* Execute the command. If the client is still valid + * after processCommand() return and there is something + * on the query buffer try to process the next command. */ + if (processCommand(c) && sdslen(c->querybuf)) goto again; + } else { + /* Nothing to process, argc == 0. Just process the query + * buffer if it's not empty or return to the caller */ + if (sdslen(c->querybuf)) goto again; + } return; } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) { redisLog(REDIS_DEBUG, "Client protocol error"); @@ -1926,12 +2015,7 @@ static void addReply(redisClient *c, robj *obj) { c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR) return; - if (obj->encoding != REDIS_ENCODING_RAW) { - obj = getDecodedObject(obj); - } else { - incrRefCount(obj); - } - listAddNodeTail(c->reply,obj); + listAddNodeTail(c->reply,getDecodedObject(obj)); } static void addReplySds(redisClient *c, sds s) { @@ -1944,8 +2028,8 @@ static void addReplyDouble(redisClient *c, double d) { char buf[128]; snprintf(buf,sizeof(buf),"%.17g",d); - addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n", - strlen(buf),buf)); + addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n", + (unsigned long) strlen(buf),buf)); } static void addReplyBulkLen(redisClient *c, robj *obj) { @@ -1965,7 +2049,7 @@ static void addReplyBulkLen(redisClient *c, robj *obj) { len++; } } - addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len)); + addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len)); } static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { @@ -2095,7 +2179,7 @@ static void decrRefCount(void *obj) { case REDIS_SET: freeSetObject(o); break; case REDIS_ZSET: freeZsetObject(o); break; case REDIS_HASH: freeHashObject(o); break; - default: assert(0 != 0); break; + default: redisAssert(0 != 0); break; } if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX || !listAddNodeHead(server.objfreelist,o)) @@ -2140,7 +2224,7 @@ static robj *tryObjectSharing(robj *o) { if (o == NULL || server.shareobjects == 0) return o; - assert(o->type == REDIS_STRING); + redisAssert(o->type == REDIS_STRING); de = dictFind(server.sharingpool,o); if (de) { robj *shared = dictGetEntryKey(de); @@ -2158,7 +2242,7 @@ static robj *tryObjectSharing(robj *o) { if (dictSize(server.sharingpool) >= server.sharingpoolsize) { de = dictGetRandomKey(server.sharingpool); - assert(de != NULL); + redisAssert(de != NULL); c = ((unsigned long) dictGetEntryVal(de))-1; dictGetEntryVal(de) = (void*) c; if (c == 0) { @@ -2171,7 +2255,7 @@ static robj *tryObjectSharing(robj *o) { int retval; retval = dictAdd(server.sharingpool,o,(void*)1); - assert(retval == DICT_OK); + redisAssert(retval == DICT_OK); incrRefCount(o); } return o; @@ -2214,7 +2298,7 @@ static int tryObjectEncoding(robj *o) { if (o->refcount > 1) return REDIS_ERR; /* Currently we try to encode only strings */ - assert(o->type == REDIS_STRING); + redisAssert(o->type == REDIS_STRING); /* Check if we can represent this string as a long integer */ if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR; @@ -2226,11 +2310,15 @@ static int tryObjectEncoding(robj *o) { return REDIS_OK; } -/* Get a decoded version of an encoded object (returned as a new object) */ -static robj *getDecodedObject(const robj *o) { +/* Get a decoded version of an encoded object (returned as a new object). + * If the object is already raw-encoded just increment the ref count. */ +static robj *getDecodedObject(robj *o) { robj *dec; - assert(o->encoding != REDIS_ENCODING_RAW); + if (o->encoding == REDIS_ENCODING_RAW) { + incrRefCount(o); + return o; + } if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) { char buf[32]; @@ -2238,16 +2326,20 @@ static robj *getDecodedObject(const robj *o) { dec = createStringObject(buf,strlen(buf)); return dec; } else { - assert(1 != 1); + redisAssert(1 != 1); } } /* Compare two string objects via strcmp() or alike. * Note that the objects may be integer-encoded. In such a case we * use snprintf() to get a string representation of the numbers on the stack - * and compare the strings, it's much faster than calling getDecodedObject(). */ + * and compare the strings, it's much faster than calling getDecodedObject(). + * + * Important note: if objects are not integer encoded, but binary-safe strings, + * sdscmp() from sds.c will apply memcmp() so this function ca be considered + * binary safe. */ static int compareStringObjects(robj *a, robj *b) { - assert(a->type == REDIS_STRING && b->type == REDIS_STRING); + redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING); char bufa[128], bufb[128], *astr, *bstr; int bothsds = 1; @@ -2270,7 +2362,7 @@ static int compareStringObjects(robj *a, robj *b) { } static size_t stringObjectLen(robj *o) { - assert(o->type == REDIS_STRING); + redisAssert(o->type == REDIS_STRING); if (o->encoding == REDIS_ENCODING_RAW) { return sdslen(o->ptr); } else { @@ -2419,16 +2511,11 @@ static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) { /* Like rdbSaveStringObjectRaw() but handle encoded objects */ static int rdbSaveStringObject(FILE *fp, robj *obj) { int retval; - robj *dec; - if (obj->encoding != REDIS_ENCODING_RAW) { - dec = getDecodedObject(obj); - retval = rdbSaveStringObjectRaw(fp,dec); - decrRefCount(dec); - return retval; - } else { - return rdbSaveStringObjectRaw(fp,obj); - } + obj = getDecodedObject(obj); + retval = rdbSaveStringObjectRaw(fp,obj); + decrRefCount(obj); + return retval; } /* Save a double value. Doubles are saved as strings prefixed by an unsigned @@ -2548,7 +2635,7 @@ static int rdbSave(char *filename) { } dictReleaseIterator(di); } else { - assert(0 != 0); + redisAssert(0 != 0); } } dictReleaseIterator(di); @@ -2584,7 +2671,7 @@ werr: static int rdbSaveBackground(char *filename) { pid_t childpid; - if (server.bgsaveinprogress) return REDIS_ERR; + if (server.bgsavechildpid != -1) return REDIS_ERR; if ((childpid = fork()) == 0) { /* Child */ close(server.fd); @@ -2601,7 +2688,6 @@ static int rdbSaveBackground(char *filename) { return REDIS_ERR; } redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid); - server.bgsaveinprogress = 1; server.bgsavechildpid = childpid; return REDIS_OK; } @@ -2683,7 +2769,7 @@ static robj *rdbLoadIntegerObject(FILE *fp, int enctype) { val = (int32_t)v; } else { val = 0; /* anti-warning */ - assert(0!=0); + redisAssert(0!=0); } return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val)); } @@ -2722,7 +2808,7 @@ static robj *rdbLoadStringObject(FILE*fp, int rdbver) { case REDIS_RDB_ENC_LZF: return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver)); default: - assert(0!=0); + redisAssert(0!=0); } } @@ -2747,6 +2833,7 @@ static int rdbLoadDoubleValue(FILE *fp, double *val) { case 253: *val = R_Nan; return 0; default: if (fread(buf,len,1,fp) == 0) return -1; + buf[len] = '\0'; sscanf(buf, "%lg", val); return 0; } @@ -2848,7 +2935,7 @@ static int rdbLoad(char *filename) { incrRefCount(ele); /* added to skiplist */ } } else { - assert(0 != 0); + redisAssert(0 != 0); } /* Add the new object in the hash table */ retval = dictAdd(d,keyobj,o); @@ -2902,6 +2989,7 @@ static void echoCommand(redisClient *c) { static void setGenericCommand(redisClient *c, int nx) { int retval; + if (nx) deleteIfVolatile(c->db,c->argv[1]); retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); if (retval == DICT_ERR) { if (!nx) { @@ -2977,26 +3065,30 @@ static void mgetCommand(redisClient *c) { } static void msetGenericCommand(redisClient *c, int nx) { - int j; + int j, busykeys = 0; if ((c->argc % 2) == 0) { - addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n")); + addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n")); return; } /* Handle the NX flag. The MSETNX semantic is to return zero and don't * set nothing at all if at least one already key exists. */ if (nx) { for (j = 1; j < c->argc; j += 2) { - if (dictFind(c->db->dict,c->argv[j]) != NULL) { - addReply(c, shared.czero); - return; + if (lookupKeyWrite(c->db,c->argv[j]) != NULL) { + busykeys++; } } } + if (busykeys) { + addReply(c, shared.czero); + return; + } for (j = 1; j < c->argc; j += 2) { int retval; + tryObjectEncoding(c->argv[j+1]); retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]); if (retval == DICT_ERR) { dictReplace(c->db->dict,c->argv[j],c->argv[j+1]); @@ -3038,7 +3130,7 @@ static void incrDecrCommand(redisClient *c, long long incr) { else if (o->encoding == REDIS_ENCODING_INT) value = (long)o->ptr; else - assert(1 != 1); + redisAssert(1 != 1); } } @@ -3136,7 +3228,7 @@ static void keysCommand(redisClient *c) { dictEntry *de; sds pattern = c->argv[1]->ptr; int plen = sdslen(pattern); - int numkeys = 0, keyslen = 0; + unsigned long numkeys = 0, keyslen = 0; robj *lenobj = createObject(REDIS_STRING,NULL); di = dictGetIterator(c->db->dict); @@ -3193,7 +3285,7 @@ static void typeCommand(redisClient *c) { } static void saveCommand(redisClient *c) { - if (server.bgsaveinprogress) { + if (server.bgsavechildpid != -1) { addReplySds(c,sdsnew("-ERR background save in progress\r\n")); return; } @@ -3205,12 +3297,13 @@ static void saveCommand(redisClient *c) { } static void bgsaveCommand(redisClient *c) { - if (server.bgsaveinprogress) { + if (server.bgsavechildpid != -1) { addReplySds(c,sdsnew("-ERR background save already in progress\r\n")); return; } if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { - addReply(c,shared.ok); + char *status = "+Background saving started\r\n"; + addReplySds(c,sdsnew(status)); } else { addReply(c,shared.err); } @@ -3221,7 +3314,7 @@ static void shutdownCommand(redisClient *c) { /* Kill the saving child if there is a background saving in progress. We want to avoid race conditions, for instance our saving child may overwrite the synchronous saving did by SHUTDOWN. */ - if (server.bgsaveinprogress) { + if (server.bgsavechildpid != -1) { redisLog(REDIS_WARNING,"There is a live saving child. Killing it!"); kill(server.bgsavechildpid,SIGKILL); rdbRemoveTempFile(server.bgsavechildpid); @@ -3793,7 +3886,7 @@ static void scardCommand(redisClient *c) { addReply(c,shared.wrongtypeerr); } else { s = o->ptr; - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n", + addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n", dictSize(s))); } } @@ -3858,12 +3951,12 @@ static int qsortCompareSetsByCardinality(const void *s1, const void *s2) { return dictSize(*d1)-dictSize(*d2); } -static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) { +static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) { dict **dv = zmalloc(sizeof(dict*)*setsnum); dictIterator *di; dictEntry *de; robj *lenobj = NULL, *dstset = NULL; - int j, cardinality = 0; + unsigned long j, cardinality = 0; for (j = 0; j < setsnum; j++) { robj *setobj; @@ -3875,7 +3968,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r zfree(dv); if (dstkey) { deleteKey(c->db,dstkey); - addReply(c,shared.ok); + addReply(c,shared.czero); } else { addReply(c,shared.nullmultibulk); } @@ -3940,9 +4033,9 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r } if (!dstkey) { - lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality); + lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality); } else { - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n", + addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n", dictSize((dict*)dstset->ptr))); server.dirty++; } @@ -4044,7 +4137,7 @@ static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnu if (!dstkey) { decrRefCount(dstset); } else { - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n", + addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n", dictSize((dict*)dstset->ptr))); server.dirty++; } @@ -4330,14 +4423,14 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor /* case 2: Score update operation */ de = dictFind(zs->dict,ele); - assert(de != NULL); + redisAssert(de != NULL); oldscore = dictGetEntryVal(de); if (*score != *oldscore) { int deleted; /* Remove and insert the element in the skip list with new score */ deleted = zslDelete(zs->zsl,*oldscore,ele); - assert(deleted != 0); + redisAssert(deleted != 0); zslInsert(zs->zsl,*score,ele); incrRefCount(ele); /* Update the score in the hash table */ @@ -4392,7 +4485,7 @@ static void zremCommand(redisClient *c) { /* Delete from the skiplist */ oldscore = dictGetEntryVal(de); deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]); - assert(deleted != 0); + redisAssert(deleted != 0); /* Delete from the hash table */ dictDelete(zs->dict,c->argv[2]); @@ -4496,6 +4589,20 @@ static void zrangebyscoreCommand(redisClient *c) { robj *o; double min = strtod(c->argv[2]->ptr,NULL); double max = strtod(c->argv[3]->ptr,NULL); + int offset = 0, limit = -1; + + if (c->argc != 4 && c->argc != 7) { + addReplySds(c, + sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n")); + return; + } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) { + addReply(c,shared.syntaxerr); + return; + } else if (c->argc == 7) { + offset = atoi(c->argv[5]->ptr); + limit = atoi(c->argv[6]->ptr); + if (offset < 0) offset = 0; + } o = lookupKeyRead(c->db,c->argv[1]); if (o == NULL) { @@ -4524,14 +4631,22 @@ static void zrangebyscoreCommand(redisClient *c) { * it later */ lenobj = createObject(REDIS_STRING,NULL); addReply(c,lenobj); + decrRefCount(lenobj); while(ln && ln->score <= max) { + if (offset) { + offset--; + ln = ln->forward[0]; + continue; + } + if (limit == 0) break; ele = ln->obj; addReplyBulkLen(c,ele); addReply(c,ele); addReply(c,shared.crlf); ln = ln->forward[0]; rangelen++; + if (limit > 0) limit--; } lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen); } @@ -4551,7 +4666,7 @@ static void zcardCommand(redisClient *c) { addReply(c,shared.wrongtypeerr); } else { zs = o->ptr; - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length)); + addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length)); } } } @@ -4628,15 +4743,9 @@ static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { } /* The substitution object may be specially encoded. If so we create - * a decoded object on the fly. */ - if (subst->encoding == REDIS_ENCODING_RAW) - /* If we don't need to get a decoded object increment the refcount - * so that the final decrRefCount() call will restore the original - * count */ - incrRefCount(subst); - else { - subst = getDecodedObject(subst); - } + * a decoded object on the fly. Otherwise getDecodedObject will just + * increment the ref count, that we'll decrement later. */ + subst = getDecodedObject(subst); ssub = subst->ptr; if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL; @@ -4655,10 +4764,7 @@ static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { keyname.buf[prefixlen+sublen+postfixlen] = '\0'; keyname.len = prefixlen+sublen+postfixlen; - keyobj.refcount = 1; - keyobj.type = REDIS_STRING; - keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2); - + initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2)) decrRefCount(subst); /* printf("lookup '%s' => %p\n", keyname.buf,de); */ @@ -4698,20 +4804,13 @@ static int sortCompare(const void *s1, const void *s2) { } } else { /* Compare elements directly */ - if (so1->obj->encoding == REDIS_ENCODING_RAW && - so2->obj->encoding == REDIS_ENCODING_RAW) { - cmp = strcoll(so1->obj->ptr,so2->obj->ptr); - } else { - robj *dec1, *dec2; - - dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ? - so1->obj : getDecodedObject(so1->obj); - dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ? - so2->obj : getDecodedObject(so2->obj); - cmp = strcoll(dec1->ptr,dec2->ptr); - if (dec1 != so1->obj) decrRefCount(dec1); - if (dec2 != so2->obj) decrRefCount(dec2); - } + robj *dec1, *dec2; + + dec1 = getDecodedObject(so1->obj); + dec2 = getDecodedObject(so2->obj); + cmp = strcoll(dec1->ptr,dec2->ptr); + decrRefCount(dec1); + decrRefCount(dec2); } } return server.sort_desc ? -cmp : cmp; @@ -4735,7 +4834,9 @@ static void sortCommand(redisClient *c) { addReply(c,shared.nokeyerr); return; } - if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) { + if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST && + sortval->type != REDIS_ZSET) + { addReply(c,shared.wrongtypeerr); return; } @@ -4788,11 +4889,15 @@ static void sortCommand(redisClient *c) { } /* Load the sorting vector with all the objects to sort */ - vectorlen = (sortval->type == REDIS_LIST) ? - listLength((list*)sortval->ptr) : - dictSize((dict*)sortval->ptr); + switch(sortval->type) { + case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break; + case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break; + case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break; + default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */ + } vector = zmalloc(sizeof(redisSortObject)*vectorlen); j = 0; + if (sortval->type == REDIS_LIST) { list *list = sortval->ptr; listNode *ln; @@ -4806,10 +4911,17 @@ static void sortCommand(redisClient *c) { j++; } } else { - dict *set = sortval->ptr; + dict *set; dictIterator *di; dictEntry *setele; + if (sortval->type == REDIS_SET) { + set = sortval->ptr; + } else { + zset *zs = sortval->ptr; + set = zs->dict; + } + di = dictGetIterator(set); while((setele = dictNext(di)) != NULL) { vector[j].obj = dictGetEntryKey(setele); @@ -4819,7 +4931,7 @@ static void sortCommand(redisClient *c) { } dictReleaseIterator(di); } - assert(j == vectorlen); + redisAssert(j == vectorlen); /* Now it's time to load the right scores in the sorting vector */ if (dontsort == 0) { @@ -4830,20 +4942,18 @@ static void sortCommand(redisClient *c) { byval = lookupKeyByPattern(c->db,sortby,vector[j].obj); if (!byval || byval->type != REDIS_STRING) continue; if (alpha) { - if (byval->encoding == REDIS_ENCODING_RAW) { - vector[j].u.cmpobj = byval; - incrRefCount(byval); - } else { - vector[j].u.cmpobj = getDecodedObject(byval); - } + vector[j].u.cmpobj = getDecodedObject(byval); } else { if (byval->encoding == REDIS_ENCODING_RAW) { vector[j].u.score = strtod(byval->ptr,NULL); } else { + /* Don't need to decode the object if it's + * integer-encoded (the only encoding supported) so + * far. We can just cast it */ if (byval->encoding == REDIS_ENCODING_INT) { vector[j].u.score = (long)byval->ptr; } else - assert(1 != 1); + redisAssert(1 != 1); } } } else { @@ -4854,7 +4964,7 @@ static void sortCommand(redisClient *c) { if (vector[j].obj->encoding == REDIS_ENCODING_INT) vector[j].u.score = (long) vector[j].obj->ptr; else - assert(1 != 1); + redisAssert(1 != 1); } } } @@ -4909,7 +5019,7 @@ static void sortCommand(redisClient *c) { addReply(c,shared.crlf); } } else { - assert(sop->type == REDIS_SORT_GET); /* always fails */ + redisAssert(sop->type == REDIS_SORT_GET); /* always fails */ } } } @@ -4938,7 +5048,7 @@ static void sortCommand(redisClient *c) { incrRefCount(val); } } else { - assert(sop->type == REDIS_SORT_GET); /* always fails */ + redisAssert(sop->type == REDIS_SORT_GET); /* always fails */ } } } @@ -4962,7 +5072,10 @@ static void sortCommand(redisClient *c) { zfree(vector); } -static void infoCommand(redisClient *c) { +/* Create the string returned by the INFO command. This is decoupled + * by the INFO command itself as we need to report the same information + * on memory corruption problems. */ +static sds genRedisInfoString(void) { sds info; time_t uptime = time(NULL)-server.stat_starttime; int j; @@ -4970,27 +5083,31 @@ static void infoCommand(redisClient *c) { info = sdscatprintf(sdsempty(), "redis_version:%s\r\n" "arch_bits:%s\r\n" - "uptime_in_seconds:%d\r\n" - "uptime_in_days:%d\r\n" + "multiplexing_api:%s\r\n" + "uptime_in_seconds:%ld\r\n" + "uptime_in_days:%ld\r\n" "connected_clients:%d\r\n" "connected_slaves:%d\r\n" "used_memory:%zu\r\n" "changes_since_last_save:%lld\r\n" "bgsave_in_progress:%d\r\n" - "last_save_time:%d\r\n" + "last_save_time:%ld\r\n" + "bgrewriteaof_in_progress:%d\r\n" "total_connections_received:%lld\r\n" "total_commands_processed:%lld\r\n" "role:%s\r\n" ,REDIS_VERSION, (sizeof(long) == 8) ? "64" : "32", + aeGetApiName(), uptime, uptime/(3600*24), listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), server.usedmemory, server.dirty, - server.bgsaveinprogress, + server.bgsavechildpid != -1, server.lastsave, + server.bgrewritechildpid != -1, server.stat_numconnections, server.stat_numcommands, server.masterhost == NULL ? "master" : "slave" @@ -5014,11 +5131,17 @@ static void infoCommand(redisClient *c) { keys = dictSize(server.db[j].dict); vkeys = dictSize(server.db[j].expires); if (keys || vkeys) { - info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n", + info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n", j, keys, vkeys); } } - addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info))); + return info; +} + +static void infoCommand(redisClient *c) { + sds info = genRedisInfoString(); + addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n", + (unsigned long)sdslen(info))); addReplySds(c,info); addReply(c,shared.crlf); } @@ -5217,7 +5340,7 @@ static void syncCommand(redisClient *c) { redisLog(REDIS_NOTICE,"Slave ask for synchronization"); /* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ - if (server.bgsaveinprogress) { + if (server.bgsavechildpid != -1) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save */ @@ -5467,6 +5590,7 @@ static int syncWithMaster(void) { } server.master = createClient(fd); server.master->flags |= REDIS_MASTER; + server.master->authenticated = 1; server.replstate = REDIS_REPL_CONNECTED; return REDIS_OK; } @@ -5560,8 +5684,8 @@ static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv char seldb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); - buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", - strlen(seldb),seldb); + buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", + (unsigned long)strlen(seldb),seldb); server.appendseldb = dictid; } @@ -5584,13 +5708,11 @@ static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv for (j = 0; j < argc; j++) { robj *o = argv[j]; - if (o->encoding != REDIS_ENCODING_RAW) - o = getDecodedObject(o); - buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr)); + o = getDecodedObject(o); + buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr)); buf = sdscatlen(buf,o->ptr,sdslen(o->ptr)); buf = sdscatlen(buf,"\r\n",2); - if (o != argv[j]) - decrRefCount(o); + decrRefCount(o); } /* Free the objects from the modified argv for EXPIREAT */ @@ -5616,6 +5738,14 @@ static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv } exit(1); } + /* If a background append only file rewriting is in progress we want to + * accumulate the differences between the child DB and the current one + * in a buffer, so that when the child process will do its work we + * can append the differences to the new append only file. */ + if (server.bgrewritechildpid != -1) + server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); + + sdsfree(buf); now = time(NULL); if (server.appendfsync == APPENDFSYNC_ALWAYS || (server.appendfsync == APPENDFSYNC_EVERYSEC && @@ -5691,7 +5821,7 @@ int loadAppendOnlyFile(char *filename) { if (buf[0] != '$') goto fmterr; len = strtol(buf+1,NULL,10); argsds = sdsnewlen(NULL,len); - if (fread(argsds,len,1,fp) == 0) goto fmterr; + if (len && fread(argsds,len,1,fp) == 0) goto fmterr; argv[j] = createObject(REDIS_STRING,argsds); if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */ } @@ -5737,11 +5867,260 @@ fmterr: exit(1); } +/* Write an object into a file in the bulk format $\r\n\r\n */ +static int fwriteBulk(FILE *fp, robj *obj) { + char buf[128]; + obj = getDecodedObject(obj); + snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr)); + if (fwrite(buf,strlen(buf),1,fp) == 0) goto err; + if (fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) goto err; + if (fwrite("\r\n",2,1,fp) == 0) goto err; + decrRefCount(obj); + return 1; +err: + decrRefCount(obj); + return 0; +} + +/* Write a double value in bulk format $\r\n\r\n */ +static int fwriteBulkDouble(FILE *fp, double d) { + char buf[128], dbuf[128]; + + snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d); + snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2); + if (fwrite(buf,strlen(buf),1,fp) == 0) return 0; + if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0; + return 1; +} + +/* Write a long value in bulk format $\r\n\r\n */ +static int fwriteBulkLong(FILE *fp, long l) { + char buf[128], lbuf[128]; + + snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l); + snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2); + if (fwrite(buf,strlen(buf),1,fp) == 0) return 0; + if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0; + return 1; +} + +/* Write a sequence of commands able to fully rebuild the dataset into + * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */ +static int rewriteAppendOnlyFile(char *filename) { + dictIterator *di = NULL; + dictEntry *de; + FILE *fp; + char tmpfile[256]; + int j; + time_t now = time(NULL); + + /* Note that we have to use a different temp name here compared to the + * one used by rewriteAppendOnlyFileBackground() function. */ + snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); + fp = fopen(tmpfile,"w"); + if (!fp) { + redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno)); + return REDIS_ERR; + } + for (j = 0; j < server.dbnum; j++) { + char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; + redisDb *db = server.db+j; + dict *d = db->dict; + if (dictSize(d) == 0) continue; + di = dictGetIterator(d); + if (!di) { + fclose(fp); + return REDIS_ERR; + } + + /* SELECT the new DB */ + if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkLong(fp,j) == 0) goto werr; + + /* Iterate this DB writing every entry */ + while((de = dictNext(di)) != NULL) { + robj *key = dictGetEntryKey(de); + robj *o = dictGetEntryVal(de); + time_t expiretime = getExpire(db,key); + + /* Save the key and associated value */ + if (o->type == REDIS_STRING) { + /* Emit a SET command */ + char cmd[]="*3\r\n$3\r\nSET\r\n"; + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + /* Key and value */ + if (fwriteBulk(fp,key) == 0) goto werr; + if (fwriteBulk(fp,o) == 0) goto werr; + } else if (o->type == REDIS_LIST) { + /* Emit the RPUSHes needed to rebuild the list */ + list *list = o->ptr; + listNode *ln; + + listRewind(list); + while((ln = listYield(list))) { + char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; + robj *eleobj = listNodeValue(ln); + + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulk(fp,key) == 0) goto werr; + if (fwriteBulk(fp,eleobj) == 0) goto werr; + } + } else if (o->type == REDIS_SET) { + /* Emit the SADDs needed to rebuild the set */ + dict *set = o->ptr; + dictIterator *di = dictGetIterator(set); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + char cmd[]="*3\r\n$4\r\nSADD\r\n"; + robj *eleobj = dictGetEntryKey(de); + + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulk(fp,key) == 0) goto werr; + if (fwriteBulk(fp,eleobj) == 0) goto werr; + } + dictReleaseIterator(di); + } else if (o->type == REDIS_ZSET) { + /* Emit the ZADDs needed to rebuild the sorted set */ + zset *zs = o->ptr; + dictIterator *di = dictGetIterator(zs->dict); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + char cmd[]="*4\r\n$4\r\nZADD\r\n"; + robj *eleobj = dictGetEntryKey(de); + double *score = dictGetEntryVal(de); + + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulk(fp,key) == 0) goto werr; + if (fwriteBulkDouble(fp,*score) == 0) goto werr; + if (fwriteBulk(fp,eleobj) == 0) goto werr; + } + dictReleaseIterator(di); + } else { + redisAssert(0 != 0); + } + /* Save the expire time */ + if (expiretime != -1) { + char cmd[]="*3\r\n$6\r\nEXPIRE\r\n"; + /* If this key is already expired skip it */ + if (expiretime < now) continue; + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulk(fp,key) == 0) goto werr; + if (fwriteBulkLong(fp,expiretime) == 0) goto werr; + } + } + dictReleaseIterator(di); + } + + /* Make sure data will not remain on the OS's output buffers */ + fflush(fp); + fsync(fileno(fp)); + fclose(fp); + + /* Use RENAME to make sure the DB file is changed atomically only + * if the generate DB file is ok. */ + if (rename(tmpfile,filename) == -1) { + redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); + unlink(tmpfile); + return REDIS_ERR; + } + redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed"); + return REDIS_OK; + +werr: + fclose(fp); + unlink(tmpfile); + redisLog(REDIS_WARNING,"Write error writing append only fileon disk: %s", strerror(errno)); + if (di) dictReleaseIterator(di); + return REDIS_ERR; +} + +/* This is how rewriting of the append only file in background works: + * + * 1) The user calls BGREWRITEAOF + * 2) Redis calls this function, that forks(): + * 2a) the child rewrite the append only file in a temp file. + * 2b) the parent accumulates differences in server.bgrewritebuf. + * 3) When the child finished '2a' exists. + * 4) The parent will trap the exit code, if it's OK, will append the + * data accumulated into server.bgrewritebuf into the temp file, and + * finally will rename(2) the temp file in the actual file name. + * The the new file is reopened as the new append only file. Profit! + */ +static int rewriteAppendOnlyFileBackground(void) { + pid_t childpid; + + if (server.bgrewritechildpid != -1) return REDIS_ERR; + if ((childpid = fork()) == 0) { + /* Child */ + char tmpfile[256]; + close(server.fd); + + snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); + if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { + exit(0); + } else { + exit(1); + } + } else { + /* Parent */ + if (childpid == -1) { + redisLog(REDIS_WARNING, + "Can't rewrite append only file in background: fork: %s", + strerror(errno)); + return REDIS_ERR; + } + redisLog(REDIS_NOTICE, + "Background append only file rewriting started by pid %d",childpid); + server.bgrewritechildpid = childpid; + /* We set appendseldb to -1 in order to force the next call to the + * feedAppendOnlyFile() to issue a SELECT command, so the differences + * accumulated by the parent into server.bgrewritebuf will start + * with a SELECT statement and it will be safe to merge. */ + server.appendseldb = -1; + return REDIS_OK; + } + return REDIS_OK; /* unreached */ +} + +static void bgrewriteaofCommand(redisClient *c) { + if (server.bgrewritechildpid != -1) { + addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n")); + return; + } + if (rewriteAppendOnlyFileBackground() == REDIS_OK) { + char *status = "+Background append only file rewriting started\r\n"; + addReplySds(c,sdsnew(status)); + } else { + addReply(c,shared.err); + } +} + +static void aofRemoveTempFile(pid_t childpid) { + char tmpfile[256]; + + snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid); + unlink(tmpfile); +} + /* ================================= Debugging ============================== */ static void debugCommand(redisClient *c) { if (!strcasecmp(c->argv[1]->ptr,"segfault")) { *((char*)-1) = 'x'; + } else if (!strcasecmp(c->argv[1]->ptr,"reload")) { + if (rdbSave(server.dbfilename) != REDIS_OK) { + addReply(c,shared.err); + return; + } + emptyDb(); + if (rdbLoad(server.dbfilename) != REDIS_OK) { + addReply(c,shared.err); + return; + } + redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD"); + addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) { dictEntry *de = dictFind(c->db->dict,c->argv[2]); robj *key, *val; @@ -5754,13 +6133,23 @@ static void debugCommand(redisClient *c) { val = dictGetEntryVal(de); addReplySds(c,sdscatprintf(sdsempty(), "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n", - key, key->refcount, val, val->refcount, val->encoding)); + (void*)key, key->refcount, (void*)val, val->refcount, + val->encoding)); } else { addReplySds(c,sdsnew( - "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT ]\r\n")); + "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT |RELOAD]\r\n")); } } +static void _redisAssert(char *estr) { + redisLog(REDIS_WARNING,"=== ASSERTION FAILED ==="); + redisLog(REDIS_WARNING,"==> %s\n",estr); +#ifdef HAVE_BACKTRACE + redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)"); + *((char*)-1) = 'x'; +#endif +} + /* =================================== Main! ================================ */ #ifdef __linux__ @@ -5790,6 +6179,7 @@ static void daemonize(void) { FILE *fp; if (fork() != 0) exit(0); /* parent exits */ + printf("New pid: %d\n", getpid()); setsid(); /* create a new session */ /* Every output goes to /dev/null. If Redis is daemonized but @@ -5820,8 +6210,8 @@ int main(int argc, char **argv) { } else { redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'"); } - initServer(); if (server.daemonize) daemonize(); + initServer(); redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION); #ifdef __linux__ linuxOvercommitMemoryWarning(); @@ -5852,7 +6242,11 @@ static void *getMcontextEip(ucontext_t *uc) { #elif defined(__dietlibc__) return (void*) uc->uc_mcontext.eip; #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6) + #if __x86_64__ + return (void*) uc->uc_mcontext->__ss.__rip; + #else return (void*) uc->uc_mcontext->__ss.__eip; + #endif #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6) #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__) return (void*) uc->uc_mcontext->__ss.__rip; @@ -5873,36 +6267,16 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) { char **messages = NULL; int i, trace_size = 0; unsigned long offset=0; - time_t uptime = time(NULL)-server.stat_starttime; ucontext_t *uc = (ucontext_t*) secret; + sds infostring; REDIS_NOTUSED(info); redisLog(REDIS_WARNING, "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig); - redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(), - "redis_version:%s; " - "uptime_in_seconds:%d; " - "connected_clients:%d; " - "connected_slaves:%d; " - "used_memory:%zu; " - "changes_since_last_save:%lld; " - "bgsave_in_progress:%d; " - "last_save_time:%d; " - "total_connections_received:%lld; " - "total_commands_processed:%lld; " - "role:%s;" - ,REDIS_VERSION, - uptime, - listLength(server.clients)-listLength(server.slaves), - listLength(server.slaves), - server.usedmemory, - server.dirty, - server.bgsaveinprogress, - server.lastsave, - server.stat_numconnections, - server.stat_numcommands, - server.masterhost == NULL ? "master" : "slave" - )); + infostring = genRedisInfoString(); + redisLog(REDIS_WARNING, "%s",infostring); + /* It's not safe to sdsfree() the returned string under memory + * corruption conditions. Let it leak as we are going to abort */ trace_size = backtrace(trace, 100); /* overwrite sigaction with caller's address */ @@ -5921,7 +6295,7 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) { redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset); } } - free(messages); + // free(messages); Don't call free() with possibly corrupted memory. exit(0); }